【Linux】命名管道的妙用:实现进程控制与实时字符交互_命名管道的交互效率
🏠 大家好,我是Yui_,一位努力学习C++/Linux的博主~💬
🍑 如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🚀 如有不懂,可以随时向我提问,我会全力讲解~
🔥 如果感觉博主的文章还不错的话,希望大家关注、点赞、收藏三连支持一下博主哦~!
🔥 你们的支持是我创作的动力!
🧸 我相信现在的努力的艰辛,都是为以后的美好最好的见证!
🧸 人的心态决定姿态!
💬 欢迎讨论:如有疑问或见解,欢迎在评论区留言互动。
👍 点赞、收藏与分享:如觉得这篇文章对您有帮助,请点赞、收藏并分享!
🚀 分享给更多人:欢迎分享给更多对 Linux 感兴趣的朋友,一起学习!
为了巩固命名管道和以前知识的学习,我们可以利用学过的知识来使用两个小程序:进程池和实时字符读取。
文章目录
- 1. 进程池
- 2. 进程池的功能
-
- 2.1 可被执行的任务
- 2.2 进程控制(重点)
-
- 2.2.1 子进程类
- 2.2 进程控制
-
- 2.2.2 创建子进程
- 2.2.3 子进程等待任务
- 2.2.4 展示空闲进程
- 2.2.5 分配任务给子进程
- 2.2.6 回收子进程
- 代码
-
- 运行结果
- 3. 实时读取字符
-
- 3.1 公共区域
- 3.2 客户端
- 3.3 服务端
- 效果图
- 4.总结
1. 进程池
在匿名管道学习后,我们已经实现了一个基于匿名管道的简单进程控制,现在我们学习了命名管道来试试用命名管道来实现进程控制吧。
正在匿名管道时,我们已经实现了进程的控制,命名管道只要在其基础上进行一些修改就可以了。
2. 进程池的功能
2.1 可被执行的任务
进程池是为了实现对进程的控制,通过父进程对众多子进程的管理,实现高效的进程执行任务。
我们可以先虚构一些任务出来,后续的子进程就来执行这些任务:
- 打印日志
- 将数据插入数据库
- 请求网络
可以用这些任务可以写成单独的函数,然后用一个类来管理这些任务。
类的成员变量:
利用哈希表来存储这些任务
类的功能有: - 展示可执行的任务
- 执行任务
具体的代码如下:
typedef void(*task_t)(void);/*函数指针*///任务集void printLog(){ std::cout<<\"PID:\"<<getpid()<<\",正在打印日志...\"<<std::endl;}void MySQLInsert(){ std::cout<<\"PID\"<<getpid()<<\",正在将数据插入数据库...\"<<std::endl;}void NetRequest(){ std::cout<<\"PID\"<<getpid()<<\",正在请求网络...\"<<std::endl;}class Task{private: std::unordered_map<std::string,task_t> st_;public: Task() { st_ = {{\"打印日志\",printLog},{\"插入数据\",MySQLInsert},{\"请求网络\",NetRequest}}; } void showTask()/*展示任务*/ { std::cout<<\"可选任务:\"<<std::endl; std::cout<<\" {\"; for(auto&s:st_) { std::cout<<s.first<<\' \'; } std::cout<<\'}\'<<std::endl; } void curTask(const std::string& t)/*执行任务*/ { if(st_.find(t) == st_.end()) { std::cout<<\"没有这个任务哦~\"<<std::endl; } else { st_[t]();/*调用任务*/ } }};
2.2 进程控制(重点)
进程控制是进程池的重点、核心。我们实现进程池也就是为了执行对进程的控制。
如何控制进程呢?
- 创建进程
- 展示空闲子进程
- 子进程等待任务的发配
- 父进程为子进程发配任务
- 杀死/回收进程
2.2.1 子进程类
为了实现这些功能,同样我们用一个类来封装它们。除此之外,还记得我们在实现基于匿名管道的进程控制吗?我们还需要一个类来描述子进程。因为我们需要知道父进程的每个写端的文件描述符和哪个子进程通过命名管道建立起来联系。
所以子进程类具有的属性就有了:
- 子进程的PID
- 写端的文件描述符
- 编号
- 名字
其中第3和第4个属性是为了方便父进程管理而加入的,后续进程控制类需要用数组将各个子进程存储起来。
子进程类代码如下:
class SubProcesses{private: pid_t id_; int processNum_; int wfd_; std::string name_; static int cnt_;public: SubProcesses(pid_t id,int wfd) :id_(id),processNum_(cnt_++),wfd_(wfd) { char buff[SIZE]{0}; snprintf(buff,SIZE,\"Process %d | pid:wfd [%d:%d]\",processNum_,id_,wfd_); name_ = buff; } pid_t getId() { return id_; } std::string getName() { return name_; } int getWfd() { return wfd_; }};int SubProcesses::cnt_ = 1;
2.2 进程控制
那么进程类会具有什么属性呢?我们需要将创建出子进程进行管理可以用一个vecotr
来对其进行管理,还需要知道我们需要创建的子进程数量,因为是基于命名管道的进程控制,为此我们还必须对个个进程管道进行管理,同样用vector
将它们装起来。除此之外在来个权限掩码也可以。
如此进程控制类的成员变量:
- 管理着子进程的
vector
类。 - 管理着命名管道的
vector
类。 - 记录需要创建的子进程数目。
- 权限掩码。
下面开始实现函数:
2.2.2 创建子进程
创建子进程,创建子进程同样还是用到fork
函数,不同的是不在需要pipe
函数来创建管道了。现在已经变成了mkfifo
函数。
和匿名管道那一样,我们要记得关闭子进程的写端,尽管子进程并没有主动打开写端,但是因为子进程继承父进程的缘故,依然会吧父进程的写端给继承过了的。一点定是要关闭的。
void createProcess(){std::vector<int> vfd;/*关闭子进程的写端*/for(int i = 1;i<=subProcessNum_;++i){std::string fifoName = \"fifo-\"+std::to_string(i); int ret = mkfifo(fifoName.c_str(),mode_);/*创建命名管道*/if(ret == -1){perror(\"mkfifo\");exit(1);}namePipe_.push_back(fifoName);/*存入命名管道*/pid_t id = fork();/*创建子进程*/if(id < 0){perror(\"fork\");exit(1);}else if(id == 0){//子进程for(auto&fd:vfd){close(fd);}int rfd = open(fifoName.c_str(),O_RDONLY);if(rfd == -1){perror(\"open\");exit(1);}//开始等待父进程的指令waitTask(rfd); /*子进程等待任务委派*/close(rfd);exit(0);}//父进程int wfd = open(fifoName.c_str(),O_WRONLY);if(wfd == -1){perror(\"open\");exit(1);}subProcess_.push_back({id,wfd});/*存入子进程信息*/fifoName.push_back(wfd);}}
2.2.3 子进程等待任务
死循环等待任务,当写端关闭再退出。
void waitTask(int rfd)/*子进程等待任务委派*/{while(true){char buff[SIZE];int n = read(rfd,buff,SIZE);if(n == -1){perror(\"read\");exit(1);}else if(n>0){Task().curTask((std::string)buff);}else if(n == 0){/*证明写端没有写消息了*/std::cout<<\"写端已经关闭,读端也即将关闭!\"<<std::endl;break;}}}
2.2.4 展示空闲进程
/*展示可选进程*/ void showProcess() { std::cout<<\"目前可用进程有:\"<<std::endl; int i = 0; std::cout<<\"|\"; for(auto&x:subProcess_) { std::cout<<\"进程编号:\"<<(i++)<<\"进程PID:\"<<x.getId()<<\"| \"; } }
2.2.5 分配任务给子进程
该函数的功能就分配任务,由用户自己选择子进程来执行任务。
/*下达任务给子进程*/void sendTask(){std::cout<<\"------------------------\"<<std::endl;while(true){int input = 0;do{showProcess();std::cout<>input; }while(input=subProcessNum_);Task().showTask();std::string taskName;std::cout<>taskName;if(taskName == \"exit\"){break;}std::cout< \"<<subProcess_[input].getName()<<\" 执行\"<<taskName<<\" 任务\"<<std::endl;write(subProcess_[input].getWfd(),taskName.c_str(),taskName.size());sleep(1);}}
2.2.6 回收子进程
回收子进程的同时也不能忘记关闭文件描述符和关闭命名管道
/*关闭写端、删除文件、等待子进程退出*/ void waitProcess() { for(int i = 0;i<subProcessNum_;++i) { close(subProcess_[i].getWfd()); unlink(namePipe_[i].c_str()); waitpid(subProcess_[i].getId(),nullptr,0); } std::cout<<\"所有子进程已回收!\"<<std::endl; }
代码
#include #include #include #include #include #include #include #include #include #include /** * 该文件为进程池的公共代码段 * 进程池为利用命名管道实现的,简称命名管道进程池 * 代码的主要功能包括:创建进程,控制进程实现委派给它的任务,等待进程 */#define SIZE 1024typedef void(*task_t)(void);//任务集void printLog(){ std::cout<<\"PID:\"<<getpid()<<\",正在打印日志...\"<<std::endl;}void MySQLInsert(){ std::cout<<\"PID\"<<getpid()<<\",正在将数据插入数据库...\"<<std::endl;}void NetRequest(){ std::cout<<\"PID\"<<getpid()<<\",正在请求网络...\"<<std::endl;}class Task{private: std::unordered_map<std::string,task_t> st_;public: Task() { st_ = {{\"打印日志\",printLog},{\"插入数据\",MySQLInsert},{\"请求网络\",NetRequest}}; } void showTask()/*展示任务*/ { std::cout<<\"可选任务:\"<<std::endl; std::cout<<\" {\"; for(auto&s:st_) { std::cout<<s.first<<\' \'; } std::cout<<\'}\'<<std::endl; } void curTask(const std::string& t)/*执行任务*/ { if(st_.find(t) == st_.end()) { std::cout<<\"没有这个任务哦~\"<<std::endl; } else { st_[t]();/*调用任务*/ } }};class SubProcesses{private: pid_t id_; int processNum_; int wfd_; std::string name_; static int cnt_;public: SubProcesses(pid_t id,int wfd) :id_(id),processNum_(cnt_++),wfd_(wfd) { char buff[SIZE]{0}; snprintf(buff,SIZE,\"Process %d | pid:wfd [%d:%d]\",processNum_,id_,wfd_); name_ = buff; } pid_t getId() { return id_; } std::string getName() { return name_; } int getWfd() { return wfd_; }};int SubProcesses::cnt_ = 1;class ProcessCtrl{private: std::vector<SubProcesses> subProcess_;/*子进程信息表*/ std::vector<std::string> namePipe_;/*命名管道信息表*/ int subProcessNum_ ;/*需要创建的子进程数目*/ mode_t mode_ ;public: ProcessCtrl(int subProcessNum = 3,mode_t mode = 0666) :subProcessNum_(subProcessNum),mode_(mode) {createProcess();/*开始创建子进程*/} void createProcess() { std::vector<int> vfd;/*关闭子进程的写端*/ for(int i = 1;i<=subProcessNum_;++i) { std::string fifoName = \"fifo-\"+std::to_string(i); int ret = mkfifo(fifoName.c_str(),mode_);/*创建命名管道*/ if(ret == -1) { perror(\"mkfifo\"); exit(1); } namePipe_.push_back(fifoName);/*存入命名管道*/ pid_t id = fork();/*创建子进程*/ if(id < 0) { perror(\"fork\"); exit(1); } else if(id == 0) { //子进程 for(auto&fd:vfd) { close(fd); } int rfd = open(fifoName.c_str(),O_RDONLY); if(rfd == -1) { perror(\"open\"); exit(1); } //开始等待父进程的指令 waitTask(rfd); /*子进程等待任务委派*/ close(rfd); exit(0); } //父进程 int wfd = open(fifoName.c_str(),O_WRONLY); if(wfd == -1) { perror(\"open\"); exit(1); } subProcess_.push_back({id,wfd});/*存入子进程信息*/ fifoName.push_back(wfd); } } void waitTask(int rfd)/*子进程等待任务委派*/ { while(true) { char buff[SIZE]; int n = read(rfd,buff,SIZE); if(n == -1) { perror(\"read\"); exit(1); } else if(n>0) { Task().curTask((std::string)buff); } else if(n == 0) { /*证明写端没有写消息了*/ std::cout<<\"写端已经关闭,读端也即将关闭!\"<<std::endl; break; } } } /*展示可选进程*/ void showProcess() { std::cout<<\"目前可用进程有:\"<<std::endl; int i = 0; std::cout<<\"|\"; for(auto&x:subProcess_) { std::cout<<\"进程编号:\"<<(i++)<<\"进程PID:\"<<x.getId()<<\"| \"; } std::cout<<std::endl; } /*下达任务给子进程*/ void sendTask() { std::cout<<\"------------------------\"<<std::endl; while(true) { int input = 0; do { showProcess(); std::cout<<\"请选择子进程#\"; std::cin>>input; }while(input<0||input>=subProcessNum_); Task().showTask(); std::string taskName; std::cout<<\"请选择任务#\"; std::cin>>taskName; if(taskName == \"exit\") { break; } std::cout<<\"选择进程-> \"<<subProcess_[input].getName()<<\" 执行\"<<taskName<<\" 任务\"<<std::endl; write(subProcess_[input].getWfd(),taskName.c_str(),taskName.size()); sleep(1); } } /*关闭写端、删除文件、等待子进程退出*/ void waitProcess() { for(int i = 0;i<subProcessNum_;++i) { close(subProcess_[i].getWfd()); unlink(namePipe_[i].c_str()); waitpid(subProcess_[i].getId(),nullptr,0); } std::cout<<\"所有子进程已回收!\"<<std::endl; }};int main(){ ProcessCtrl pp; pp.sendTask(); return 0;}
运行结果
3. 实时读取字符
我们还可以通过命名管道来实现字符的实时读取,还挺有意思的,为了实现这个功能,我们不仅需要会使用命名管道,还有如system()和fflush()
函数。
3.1 公共区域
#include #include #include #include #include #include #include #include #define SIZE 1024//命名管道const char* namePipe = \"./fifo\";//权限掩码const mode_t mode = 0666;
3.2 客户端
客户端用来向服务端发送消息
/** * 该文件为客户端,用来实时向服务端发送字符 */#include \"common.hpp\"int main(){ /**主要步骤: * 1.创建命名管道 * 2.打开命名管道 * 3.写入字符到命名管道当中 * 4.关闭命名管道 * */ int mk = mkfifo(namePipe,mode); if(mk == -1) { perror(\"mkfifo\"); exit(1); } int fd = open(namePipe,O_WRONLY);//写方式打开 if(fd == -1) { perror(\"open\"); exit(1); } std::cout<<\"开始输入字符:ctrl+c退出\"<<std::endl; while(true) { system(\"stty raw\");/*调用 shell 命令 `stty raw` 的,它的作用是将终端设置为 原始模式(raw mode)。*/ int c = getchar(); system(\"stty -raw\");/*用于将终端从 原始模式(raw mode) 恢复到 规范模式(cooked mode)。*/ if(c == 3)/*ctrl+c == 3 */ { std::cout<<\"exit!\"<<std::endl; break; } ssize_t n = write(fd,(char*)&c,sizeof(char)); assert(n>=0); (void)n;/*消除未使用警告*/ } close(fd); unlink(namePipe); return 0;}
3.3 服务端
/** * 该文件为服务端,用来实时接受客户端输出的字符 */#include \"common.hpp\"int main(){ /** * 实时读取客户端发来的字符 * 主要功能: * * 1.已读方式打开命名管道文件 * 2.利用fflush实时刷新缓冲区的字符 * 3.关闭文件描述符 */ int fd = open(namePipe,O_RDONLY); assert(fd!=-1); while(true) { char buff[SIZE]{0}; ssize_t n = read(fd,buff,SIZE-1); if(n>0) { buff[n] = 0; printf(\"%c\",buff[0]); fflush(stdout); } else if(n == 0) { std::cout<<std::endl; std::cout<<\"写端退出,终止读端\"<<std::endl; break; } else { perror(\"read\"); close(fd); exit(1); } } close(fd); return 0;}
效果图
屏幕录制 2024-11-20 202850
往期Linux文章:Linux专栏
4.总结
通过命名管道实现了这两个简单的小程序,其实这些小程序的本质都是一样的:创建命名管道 -> 打开命名管道 -> 通信 -> 关闭命名管道,掌握其中一个即可融会贯通!
感谢阅读。