Linux《进程间通信(下)》_双核一个核跑linux一个跑rtos如何在rtos端写一个接口接收linux端的结构体
在之前的Linux《进程间通信(上)》当中我们已经了解了进程通信基本的概念,并且了解了匿名管道和命名管道两大进程通信的方式,那么接下来在本篇当中我们就将继续之前的学习,在本篇当中将学习System V进程通信方式的基本原理和对应的接口的使用方法,会了解到System V通信当中的共享内存、消息队列和信号量,在学习过程之中会了解到不同通信方式的的核心原理以及和之前学习的通信方式的差别。接下来就开始本篇的学习,一起加油吧!!!
1.System V共享内存
1.1共享内存原理
在了解System V通信当中的共享内存通信方式之前首先要来了解System V,实际上System V 是 Unix 的一个经典版本,其定义的 IPC 机制(消息队列、信号量、共享内存)被称为 System V IPC,名称中的 \"V\" 是罗马数字 5,代表 AT&T Unix 的第五代主要发行版,简单来说就是System V就是一种进程通信的标准。在Linux内核当中支持了这种标准,专门设计了一个IPC通信模块,通信设计的原理和接口都很类似。
接下来就来了解System V当中的其中之一的通信方式——共享内存。
在共享内存当中为了让不同的进程看到同一份资源,在此同一份资源就是一块物理内存。实际上就是将内存当中的一块内存都是映射到两个要进行通信的进程的虚拟内存空间上。
实际上以上图示进行的操作都是操作系统执行的,包括从物理内存当中申请空间以及将物理内存和进程内的虚拟地址空间建立映射。操作系统同时提供了对应的系统调用来能实现以上的操作。接下来我们将会了解对应的接口并了解该如何使用。
实际上要让指定的两个进程使用共享内存进行进程间的通信实际上需要进行两步操作:1.在物理内存当中申请对应的通信区域 ,2.将物理内存当中的区域与两个进程的虚拟内存空间建立关联关系
那么在将共享内存销毁就需要以下的两布:1.首先需要去除两个进程和对应物理内存的关联,2.将对应的物理内存空间释放
实际上共享内存区是最快的IPC形式。⼀旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执⾏进⼊内核的系统调⽤来传递彼此的数据。对应的进程访问相应的共享内存映射的虚拟地址空间就可如同访问虚拟地址当中的栈区一样。
在操作系统当中可能会同时存在多对进程使用共享内存进行通信,那么这时候就需要对这一个个的共享内存进程管理,就如同之前学习进程一样管理的方式就是经典的:先描述再组织。那么这就说明在内核当中一定是存在描述共享内存相关属性的结构体。
确实是这样的,以下就是内核当中共享内存的结构体。
struct shmid_ds { struct ipc_perm shm_perm; /* 操作权限 */ int shm_segsz; /* 共享内存段的大小(以字节为单位) */ __kernel_time_t shm_atime; /* 最后一次附加(attach)时间 */ __kernel_time_t shm_dtime; /* 最后一次分离(detach)时间 */ __kernel_time_t shm_ctime; /* 最后一次变化(例如权限修改)时间 */ __kernel_ipc_pid_t shm_cpid; /* 创建共享内存的进程的 PID */ __kernel_ipc_pid_t shm_lpid; /* 最后操作共享内存的进程的 PID */ unsigned short shm_nattch; /* 当前附加到此共享内存段的进程数量 */ unsigned short shm_unused; /* 兼容性字段(通常不使用) */ void shm_unused2; /* 兼容性字段(通常不使用) */ void shm_unused3; /* 不使用的字段 */};
1.2 共享内存接口
以上我们在了解共享内存相关的概念之后接下来就可以试着来学习共享内存对应的系统调用接口,首先要了解到的是共享内存创建的相同调用shmget。
功能:⽤来创建共享内存 int shmget(key_t key, size_t size, int shmflg);参数:key:这个共享内存段名字size:共享内存⼤⼩shmflg:由九个权限标志构成,它们的⽤法和创建⽂件时使⽤的mode模式标志是⼀样的取值为IPC_CREAT:共享内存不存在,创建并返回;共享内存已存在,获取并返回。取值为IPC_CREAT | IPC_EXCL:共享内存不存在,创建并返回;共享内存已存在,出错返回。返回值:成功返回⼀个⾮负整数,即该共享内存段的标识码;失败返回-1
通过以上就可以看到shmget是有三个参数的,其中第一个参数就是共享内存段的名字,用于标识共享内存的唯一性,实际上要让不同的进行使用共享内存进行通信,那么首先就需要让不同的进程确定好要进行通信的共享内存的名字是什么。
当时问题就来了该如何让两个进程确定好唯一的共享内存呢?
这时你可能会说直接先在一个进程内确定好要使用的共享内存,之后再将该共享内存的名字传给另外一个要进行通信的进程即可。但是我们目的就是为了实现两个进程之间的通信,如果说一个进程都能将数据发给另外一个进程不就已经实现了进程间的通信了吗,那么还要实现共享内存干嘛,这里不就存在矛盾了吗?
因此以上的方式是不可取的,那么这时你可能又想到直接通过遍历内核当中所有共享内存的key来确定要使用的是哪一个,但是这时又会衍生出效率相关的问题:如果内核自己遍历确定 key:1.内核必须先知道你“想访问的共享内存是哪块”,但你没给它名字,它只能去遍历所有已创建的共享内存段并猜测匹配条件(大小?权限?某个附加标识?),2.这样不仅效率低,还容易出现歧义(两个共享内存段可能大小相同、权限相同,但数据不同)。
因此在操作系统当中在共享内存中key定位的最佳方式实际上是通过用户构建并传给OS,OS当中提供以下的系统调用实现key的构建。
#include key_t ftok(const char *pathname, int proj_id);//pathname你指定的一个已存在的文件路径(通常是一个常驻且权限可控的文件,比如 /tmp/ipcfile)。//proj_id一个 0~255 的整数(只取低 8 位),用来区分不同的 IPC 对象。
实际上ftok的作用就是根据用户传入的文件路径以及proj_id生成唯一的key_t值,这样就可以通过该值来唯一的表示共享内存。
shmget的第二的参数就是要创建的共享内存的大小,第三个参数就是对应的标志位,在此最常使用的就是IPC_CREAT 和IPC_EXCL。
其中IPC_CREAT的作用就是创建共享内存,若共享内存不存在就创建,若要创建的共享内存原来不存在就创建对应的共享内存;负责就打开并返回已经存在的共享内存。
IPC_EXCL单独使用无意义,必须要与IPC_CREAT搭配使用,当共享内存不存在就创建,若共享内存已经存在了就出错返回。
那么这时就说明了使用 IPC_CREAT | IPC_EXCL 时成功返回就一定是创建了一个全新的共享内存。
将共享内存创建出来之后接下来就就下来就继续来了解将共享内存关联到进程地址空间的接口shmat
功能:将共享内存段连接到进程地址空间void *shmat(int shmid, const void *shmaddr, int shmflg);参数:shmid: 共享内存标识shmaddr:指定连接的地址shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY返回值:成功返回⼀个指针,指向共享内存第⼀个节;失败返回-1
在此可以看到函数的参数有三·个,其中第一个参数就是之前使用shmget时的返回值,函数的第二个参数是要指定连接的地址,一般是将该参数设置为NULL,系统自动选择地址。函数的第三个参数是附加标志,一般设置为0即可。
那么在了解设置关联的函数之后接下来还需要了解解除关联的函数shmdt。
功能:将共享内存段与当前进程脱离int shmdt(const void *shmaddr);参数:shmaddr: 由shmat所返回的指针返回值:成功返回0;失败返回-1
以上函数的参数就是使用shmat之后函数返回的指针,当成功的解除关联函数的返回值就为0,否则为-1。
注:将共享内存段与当前进程脱离不等于删除共享内存段。
那么要将共享内存真正的删除需要怎么做呢?在此就需要实现要以下的接口shmctl来实现。
功能:⽤于控制共享内存int shmctl(int shmid, int cmd, struct shmid_ds *buf);参数:shmid:由shmget返回的共享内存标识码cmd:将要采取的动作(有三个可取值)buf:指向⼀个保存着共享内存的模式状态和访问权限的数据结构返回值:成功返回0;失败返回-1
在使用shmstl进行共享内存的删除时,函数的第一个参数就为对应共享内存的id,第二个参数使用IPC_RMID,第三个参数使用NULL即可。
1.3 共享内存demo代码
以上我们已经了解了共享内存接口的该如何使用,那么接下来就可以试着使用以上的接口来实现两个进程之间的通信。
在此使用三个文件实现,shm.hpp内实现共享内存各个接口的面向对象话。在server.cc当中实现共享内存的创建,以及向共享内存内写入数据,在slient.cc内实现从共享内存当中读取数据。
实现代码如下所示:
shm.hpp
#include #include #include #include #include const int size = 4096;const int keynum = 0x666;//错误处理宏#define ERR_EXIT(m) \\ do\\ { \\ perror(m); \\ exit(EXIT_FAILURE); \\ } while (0)#define PATH \".\"class Shm{public: Shm(const std::string path, int keynum) : _shmsize(size), _shmstr(nullptr) { _key = ftok(path.c_str(), keynum); if (_key < 0) { ERR_EXIT(\"ftok error!\"); } printf(\"ftok success!\\n\"); } // 创建共享内存段 void Create() { CreateHelper(IPC_CREAT | IPC_EXCL | 0666); } // 获取共享内存段 void Get() { CreateHelper(IPC_CREAT); } // 附加共享内存段到当前进程的地址空间 void Attach() { _shmstr = shmat(_shmid, nullptr, 0); if ((long long)_shmstr < 0) { ERR_EXIT(\"shmat\"); } printf(\"shmat success!\\n\"); } // 获取共享内存段的字符串指针 void *GetStr() { return _shmstr; } ~Shm() { Distory(); } // 使用shmctl获取共享内存段的状态 void GetStatus() { struct shmid_ds buf; if (shmctl(_shmid, IPC_STAT, &buf) < 0) { ERR_EXIT(\"shmctl IPC_STAT\"); } printf(\"共享内存的大小: %zu\\n\", buf.shm_segsz); printf(\"创建共享内存的进程ID: %d\\n\", buf.shm_cpid); printf(\"共享内存的访问权限: %o\\n\", buf.shm_perm.mode); printf(\"共享内存的key值: %d\\n\", buf.shm_perm.__key); printf(\"共享内存的ID: %d\\n\", _shmid); }private: // 辅助函数,用于创建共享内存段 void CreateHelper(int shm) { _shmid = shmget(_key, size, shm); if (_shmid < 0) { ERR_EXIT(\"shm error\"); } printf(\"shm create success!\\n\"); } // 销毁共享内存段 void Distory() { int shmd = shmctl(_shmid, IPC_RMID, nullptr); if (shmd < 0) { ERR_EXIT(\"shmctl\"); } printf(\"shm destory success!\\n\"); }private: key_t _key; int _shmid; // 共享内存段的大小 size_t _shmsize; //指向共享内存段的指针 void *_shmstr;};
server.cc
#include \"shm.hpp\"#include int main(){ Shm s(PATH, keynum); // 创建共享内存段 s.Create(); // 附加共享内存段到当前进程的地址空间 s.Attach(); // 获取共享内存段的状态 s.GetStatus(); // 获取共享内存段的字符串指针 char *arr = (char *)s.GetStr(); int index=0; for (char i = \'A\'; i <= \'Z\'; i++,index+=2) { sleep(1); arr[index] = i; arr[index+1]=i; arr[index+2]=0; } return 0;}
client.cc
#include \"shm.hpp\"#include#include int main(){ Shm s(PATH, keynum); //得到共享内存段 s.Get(); // 附加共享内存段到当前进程的地址空间 s.Attach(); // 获取共享内存段的字符串指针 char *arr = (char *)s.GetStr(); while (1) { printf(\"%s\\n\", (char *)s.GetStr()); sleep(1); if(arr[strlen(arr) - 1] == \'Z\') { break; } } return 0;}
编译以上的代码,执行就可以发现确实能满足我们的要求,实现了两个进程之间的通信。
以上就会发现是我们自己修正了两个进程之间通信的同步,如果没有进行同步那么就会出现写端的进程还在写的时候读端就在读了,那么这就说明共享内存本身并没有内置的同步机制,它只是提供了一块允许多个进程同时访问的内存区域。在多个进程并发读写共享内存时,没有自动的同步控制,因此,必须由开发者显式地使用其他机制来确保数据一致性。
那么这时就需要思考了,要使用什么样的技术来实现同步呢?
实际上正常情况下使用的锁,但是目前我们还未学习到锁相关,因此目前就试着使用之前学习的命名管道来实现共享内存的同步机制。
在原来的目录当中添加fifo.hpp文件,代码如下所示:
#pragma once#include #include #include #include #include #include \"comm.hpp\"#define NAME \"fifo\"#define PATH \".\"class Fifo{public: Fifo(std::string path, std::string name) : _name(name), _path(path) { _fifoname = _path + \"/\" + _name; umask(0); // 新建管道 _n = mkfifo(_fifoname.c_str(), 0666); if (_n < 0) { ERR_EXIT(\"mkfifo\"); } else { std::cout << \"创建管道成功!\" < 0) // 销毁管道 unlink(_fifoname.c_str()); }private: std::string _path; std::string _name; std::string _fifoname; int _n;};class Openfifo{public: Openfifo(std::string path, std::string name) : _name(name), _path(path) { _filename = path + \"/\" + name; } ~Openfifo() { } void Open_for_write() { // 打开管道 _fd = open(_filename.c_str(), O_WRONLY); if (_fd < 0) { ERR_EXIT(\"open fifo\"); } else { std::cout << \"打开管道成功\" << std::endl; } } void open_for_read() { // 打开管道 _fd = open(_filename.c_str(), O_RDONLY); if (_fd < 0) { ERR_EXIT(\"open fifo\"); } else { std::cout << \"打开管道成功\" <0)printf(\"尝试唤醒\\n\"); } //唤醒管道 bool wakeup() { char ch; int ret = read(_fd, &ch, 1); if (ret > 0) { printf(\"唤醒成功!\\n\"); return true; } return false; } void Close() { close(_fd); }private: std::string _path; std::string _name; std::string _filename; int _fd;};
再将原来的server.cc和client.cc修改为如下:
server.cc
#include \"shm.hpp\"#include\"fifo.hpp\"#include int main(){ Shm s(PATH, keynum,CREATE); s.Attr(); Fifo f(PATH,NAME); Openfifo opf(PATH,NAME); opf.Open_for_write(); char *arr = (char *)s.GetStr(); int index=0; for (char i = \'A\'; i <= \'Z\'; i++,index+=2) { sleep(1); arr[index] = i; arr[index+1]=i; arr[index+2]=0; sleep(1); //使用管道唤醒 opf.wait(); // printf(\"%s\\n\",(char*)s.GetStr()); } return 0;}
client.cc
#include \"shm.hpp\"#include \"fifo.hpp\"#include int main(){ Shm s(PATH, keynum, USER); Openfifo opf(PATH, NAME); opf.open_for_read(); char *arr = (char *)s.GetStr(); while (1) { //等待管道唤醒 if (opf.wakeup()) { printf(\"%s\\n\", (char *)s.GetStr()); sleep(2); } else break; } return 0;}
运行效果如下所示:
2. System V消息队列
2.1 消息队列原理
接下来我们就来了解Systems V当中的另外一种通信的方式——消息队列。在操作系统当中实际上在内存当中实现了一块缓冲区来实现两个进程之间的通信,但是和之前了解的共享内存不同的是在此不是直接将缓冲区映射到进程的虚拟内存空间上,而是在缓冲区当中维护一个进程通信的队列,将各个数据封装成为一个个的节点,并且每个节点内都有对应发送方的信息。
并且在消息队列当中实际上是会存在不同进程消息的节点的,那么为什么要设计成这样呢?
实际上是为了提高资源的使用效率,因为如果让每两个进行通信的进程都各自的使用一条消息队列话,那么这是如果有n个进程对进进行通信,那么这是在内存当中就需要同时存在N×(N-1)/2 个队列,数量会呈平方级增长,管理非常麻烦。同时每个消息队列在内核中都有数据结构、等待队列、内存缓冲区,这些都会消耗内核资源。因此实际上在同一个消息队列当中是可以存储不同进程的消息块的,每个消息块当中都会存在一个type,使用该变量来标识进程的发送方。
struct msg { struct msg *next; /* 下一条消息 */ long m_type; /* 消息类型(用户指定) */ size_t m_ts; /* 消息数据长度 */ void *m_text; /* 指向消息数据的指针 */};
以上就是队列当中每个节点存储的实际数据。
我们知道在内存当中可能会存在多条的消息队列,那么不就需要将这些消息队管理起来吗?组织的方法是什么呢?
其实这又涉及到了经典的先描述再组织。每条消息队列都都会存在一个对应的结构体对象,用于描述消息队列的属性信息。结构体形式如下所示:
struct ipc_perm { key_t __key; /* Key supplied to xxxget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode;/* Permissions */ unsigned short __seq;/* Sequence number */};struct msqid_ds { struct ipc_perm msg_perm; struct msg msg_first; / first message on queue,unused */ struct msg msg_last; / last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */}
那么在此你可能又有问题了,那就是两个进程要使用消息队列进程通信,那么一个进程是知道另外一个进程选择的消息队列是那一个呢?
和共享内存类似,在消息队列当中也是让用户传给要通信的进程对应的key值,那么进程有了key就可以找到对应的消息队列,这样就可以保证两个进程使用的是同一个消息队列。
2.2 消息队列接口
以上我们在了解了消息队列的原理之后接下来就接着来了解消息队列对应的系统调用接口。
首先来了解消息队列创建的系统调用msgget
#include #include #include int msgget(key_t key, int msgflg);参数• key : 某个消息队列的名字 • msgflg :由九个权限标志构成,它们的⽤⽤法和创建⽂⽂件时使⽤⽤的mode模式标志是⼀⼀样的msgflg:取值为IPC_CREAT:消息队列不存在,创建并返回;消息队列已存在,获取并返回。取值为IPC_CREAT | IPC_EXCL:消息队列不存在,创建并返回;消息队列已存在,出错返回返回值如果调用成功,返回值是消息队列的标识符(一个非负整数);如果调用失败,返回值为 -1,并且 errno 会被设置为相应的错误代码。
通过以上就可以看出函数的参数是有两个的,其中第一个参数就是用户进行标识消息队列的对应的key,第二个参数就是创建消息队列的标识位。
实际上通过以上接口的了解就可以发现和共享内存的接口使用完全是类似的, 这时了解起来就很简单了。
接下来在来了解删除消息队列使用的系统调用msgctl
#include #include #include int msgctl(int msqid, int cmd, struct msqid_ds *buf);结构体定义:struct msqid_ds { struct ipc_perm msg_perm; /* 拥有者和权限信息 */ time_t msg_stime; /* 上一次 msgsnd(2) 的时间 */ time_t msg_rtime; /* 上一次 msgrcv(2) 的时间 */ time_t msg_ctime; /* 上一次变更的时间 */ unsigned long __msg_cbytes; /* 队列中当前的字节数(非标准字段) */ msgqnum_t msg_qnum; /* 队列中当前的消息数量 */ msglen_t msg_qbytes; /* 队列允许的最大字节数 */ pid_t msg_lspid; /* 上一次调用 msgsnd(2) 的进程 PID */ pid_t msg_lrpid; /* 上一次调用 msgrcv(2) 的进程 PID */};参数• msgid : 由 msgget 函数返回的消息队列标识码 • cmd :将要采取的动作(有三个可取值)返回值如果调用成功:IPC_STAT、IPC_SET 和 IPC_RMID 返回 0,IPC_INFO 或 MSG_INFO 成功时返回内核内部数组中已使用的最高条目索引(该数组记录了所有消息队列的信息,可以结合多次调用 MSG_STAT 或 MSG_STAT_ANY 获取系统中所有队列的信息),MSG_STAT 或 MSG_STAT_ANY 成功时,返回给定索引对应队列的标识符如果调用失败,返回 -1,并且 errno 会被设置为相应的错误代码。
通过以上就可以看出msgstl参数有三个,分别是消息队列的ID、要采取的动作cmd、以及输出型参数buf用于获取对应结构体内的数据。
要采取的动作的三个取值如下所示:
要将消息队列删除就需要将cmd置为IPC_RMID同时将buf置为nullptr。
接下来再来了解两个消息队列接送消息的系统调用msgsnd和msgrcv
#include #include #include int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);参数• msgid : 由 msgget 函数返回的消息队列标识码 • msgp:是⼀个指针,指针指向准备发送的消息 • msgsz:是msgp指向的消息⻓度,这个⻓度不含保存消息类型的那个long int⻓整型 • msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情, 0即可( msgflg=IPC_NOWAIT 表示队列满不等待,返回 EAGAIN 错误 )。 返回值• 成功返回0;失败返回-1
以上就是向消息队列当中添加对应的消息节点的系统调用,第一个参数就是对应的消息队列id,第二个参数是msgbuf的结构体指针,结构体的形式如下需要我们自己在代码中定义
第一个成员变量为消息的type标识,用于标识发送方,第二个成员变量是一个数组,该数组内就存储要发送的数据信息。
msgget的第三个参数是发送的消息的长度,最后一个参数是用于标识阻塞还是非阻塞,正常情况下只需要将该参数设置为0即可。
#include #include #include ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);参数• msgid : 由 msgget 函数返回的消息队列标识码 • msgp :是⼀个指针,指针指向准备接收的消息 • msgsz :是 msgp 指向的消息⻓度,这个⻓度不含保存消息类型的那个long int⻓整型 • msgtype :它可以实现接收消息的类型,也可以模拟优先级的简单形式进⾏接收 • msgflg :控制着队列中没有相应类型的消息可供接收时将要发生的事 返回值• 成功返回实际放到接收缓冲区⾥⾥去的字符个数,失败返回 -1
以上就是消息队列当中进行数据接收的系统调用,以上函数的参数和msgsnd类似,只不过在函数当中多了一个参数msgtyp,该参数就是进行接受的时候要接收的消息类型。
2.3 消息队列demo代码
在以上当中我们就了解了基本的消息队列相关的系统调用,那么接下来就继续来使用以上的接口来实现一个两个进程间进行通信的代码。
在此创建三个文件,在文件MsgQueue.hpp当中实现系统的基本写入数据以及读取数据的接口,在client.cc当中实现向消息队列当中写入数据,在server.cc当中实现向消息队列当中读取数据。
实现的代码如下所示:
MsgQueue.hpp
#include #include #include #include #include //ket值使用的路径和项目标识符#define PATH \".\"#define PROJIN 0x666//默认的文件描述符和消息队列大小const int default_fd = -1;const int default_size = 1024;//两种消息队列的创建方式//GET_MSGQUEUE用于获取已存在的消息队列,GREAT_MSGQUEUE用于创建#define GET_MSGQUEUE (IPC_CREAT)#define GREAT_MSGQUEUE (IPC_CREAT | IPC_EXCL | 0666)//消息队列消息块类型标识#define MSG_TYPT_CLIENT 1#define MSG_TYPE_SERVER 2class MysQueue{public:// 消息块结构体 struct msgbuf { long mtype; char mtext[default_size]; }; MysQueue() : _msgfd(default_fd) { } ~MysQueue() { } //创建消息队列 void Create(int flag) { key_t key = ftok(PATH, PROJIN); if (key == -1) { perror(\"ftok\"); exit(1); } std::cout << \"ftok success!\" << std::endl; _msgfd = msgget(key, flag); if (_msgfd < 0) { perror(\"msgget\"); exit(1); } std::cout << \"msgget success!\" << std::endl; } // 发送消息 void Send(int type, const std::string text) { msgbuf msg; msg.mtype = type; memset(msg.mtext, 0, sizeof(msg.mtext)); memcpy(msg.mtext, text.c_str(), text.size()); int n = msgsnd(_msgfd, &msg, sizeof(msg.mtext), 0); if (n < 0) { perror(\"msgsnd\"); exit(1); } std::cout << \"send success!\" << std::endl; } // 接收消息 void Receive(int type, std::string &text) { msgbuf msg; int n = msgrcv(_msgfd, &msg, sizeof(msg.mtext), type, 0); // std::cout<<n<<std::endl; if (n < 0) { perror(\"msgrcv\"); exit(1); } std::cout << \"receive success!\" << std::endl; msg.mtext[n] = \'\\0\'; text = msg.mtext; } //销毁消息队列 void Distory() { int n = msgctl(_msgfd, IPC_RMID, nullptr); if (n < 0) { perror(\"msgctl\"); exit(1); } std::cout << \"msgctl success!\" << std::endl; }private: int _msgfd;};class MysQueue_serve : public MysQueue{public: MysQueue_serve() { MysQueue::Create(GET_MSGQUEUE); } ~MysQueue_serve() { }};class MysQueue_client : public MysQueue{public: MysQueue_client() { MysQueue::Create(GREAT_MSGQUEUE); } ~MysQueue_client() { MysQueue::Distory(); }};
server.cc
#include \"MsgQueue.hpp\"int main(){ MysQueue_serve mys; std::string text; while (1) { mys.Receive(MSG_TYPT_CLIENT, text); std::cout << \"Recive:\" << text << std::endl; if (text == \"exit\") { break; } } return 0;}
client.cc
#include\"MsgQueue.hpp\"int main(){ MysQueue_client mys; std::string text; while(1) { std::cout<>text; mys.Send(MSG_TYPT_CLIENT,text); if(text==\"exit\") { break; } } return 0;}
以上就实现使用消息队列进行两个进程之间的通信,那么接下来就运行以上的代码看是否符合我们的要求。
注:在Linux当中可以使用ipcs -q指令来查看系统当中存在的消息队列以及对应的属性信息。
例如以上的程序运行起来之后就可以使用ipcs来查询,结果如下所示
要删除对应的消息队列就可以使用ipcrm -q msqid
2.4 基于责任链模式消息队列
以上我们已经实现了使用消息队列实现基本的两进程之间的通信,但是接下来又有了新的需求就是1.client 发送给 server 的输入内容,拼接上时间,进程pid信息
2.server 收到的内容持久化保存到文件中
3.文件的内容如果过大,要进行切片保存并在指定的目录下打包保存,命令自定义
对于以上的要求解决方案:责任链模式
⼀种行为设计模式,它允许你将请求沿着处理者链进行传递。每个处理者都对请求进行检查,以决定是否处理它。如果处理者能够处理该请求,它就处理它;否则,它将请求传递给链中的下⼀个处理者。这个模式使得多个对象都有机会处理请求,从而避免了请求的发送者和接收者之间的紧耦合。
要理解以上的模式实际上我们可以类比一个生活当中的例子——公司请假流程:
1.普通员工请假 → 先交给 组长
2.如果天数太多组长批不了 → 转交 经理
3.经理还批不了 → 转交 总监
以上这样一个审批链就是责任链模式。
那么接下来就按照责任链的模式将以上实现的代码进行调整,实际上我们不需要修改已经实现的文件内的内容,只需要再创建一个handler.hpp的文件,在该文件内实现责任链各个节点对数据的处理即可。
在此整体代码的逻辑为每个节点称为处理器,在每个处理器当中都存在下一个处理器对于的指针,并且还需要存在一个变量用于标识当前的处理器在责任链当中是否运行。
1.基类实现
那么首先就创建一个所有处理器的基类HandlerText,实现代码如下所示:
//所有处理器的基类class HandlerText{public: HandlerText() : _enable(true) { } virtual ~HandlerText() = default; //各个处理器具体的执行函数 virtual void Execute(std::string &text) { } //是否调用处理器 bool IsEnable() { return _enable; } //启动处理器 void Enble() { _enable = true; } //关闭处理器 void DisEnble() { _enable = false; } //设置下一个处理器 void SetNextHandler(std::shared_ptr handler) { _next_handler = handler; }protected: //下一个处理器 std::shared_ptr _next_handler; //设置是否启用处理器 bool _enable;};
以上将处理器当中具体的执行函数设置为虚函数就可以让接下来HandlerText的子类进行重写。
2. 格式化实现
接下来实现的是对收到的消息进行格式化处理的代码
// 格式化处理class HandlerTextFormat : public HandlerText{public: void Execute(std::string &text) override { if (HandlerText::IsEnable()) { std::cout << \"1.开始格式化处理 Format…… \" << std::endl; std::stringstream ss; ss << time(nullptr) << \"-\" << getpid() << \"-\" << text <Execute(text); } else { std::cout << \"责任链节点结束,处理完成!\" << std::endl; } }};
3. 保存至文件
将得到的消息格式化处理之后接下来就将对应的消息保存到指定的文件内,实现代码如下所示:
//默认保存的路径和文件名std::string defaultpath = \"./tmp/\";std::string defaultfilename = \"test.log\";// 保存至文件当中class HandlerTextSaveFile : public HandlerText{public: HandlerTextSaveFile() : _filepath(defaultpath), _filename(defaultfilename) { //使用filesystem库检查路径是否存在 if (std::filesystem::exists(_filename)) { return; } try { //打开对应路径下的文件夹 std::filesystem::create_directories(_filepath); } catch (std::filesystem::filesystem_error &e) { std::cerr << e.what() << std::endl; } } ~HandlerTextSaveFile() { } void Execute(std::string &text) override { if (HandlerText::IsEnable()) { std::cout << \"2.保存至指定\" << _filename << \"文件当中,Save……\" << std::endl; std::string file = _filepath + _filename; std::ofstream out(file, std::ios::app); if (!out.is_open()) return; out <Execute(text); } else { std::cout << \"责任链节点结束,处理完成!\" << std::endl; } }private: //保存的文件路径和文件名 std::string _filepath; std::string _filename;};
4. 打包压缩文件
当存储消息的文件内的行数超出允许的最大值之后就将原来的文件重命名为加上对应的时间戳,并且在进行打包压缩为tgz文件,实现代码如下所示:
//文件当中保存的最大消息行数const int maxline = 5;// 备份文件class HandlerTextBackupFile : public HandlerText{public: HandlerTextBackupFile() : _max_line_number(maxline), _filename(defaultfilename), _filepath(defaultpath) { } ~HandlerTextBackupFile() { } //文件备份 void BackUp() { std::string newname = _filename + \".\" + std::to_string(time(nullptr)); //创建子进程来实现文件备份 pid_t pid = fork(); if (pid == 0) { // 子进程 chdir(_filepath.c_str()); //重命名文件 std::filesystem::rename(_filename, newname); std::string tarname = newname + \".tgz\"; std::cout << \"4.打包:\" << newname << \"成为:\" << tarname < 0 && WIFEXITED(status)) { std::string tmpfile = _filepath + newname; //删除备份文件,只保留打包之后的文件 std::filesystem::remove(tmpfile); } } void Execute(std::string &text) override { if (HandlerText::IsEnable()) { std::cout << \"3.进行文件备份,Backup……\" < _max_line_number) { std::cout << \"消息行超过\" << _max_line_number << \"行,触发文件备份\" <Execute(text); } else { std::cout << \"责任链节点结束,处理完成!\" << std::endl; } }private: int _max_line_number; std::string _filepath; std::string _filename;};
5. 责任链入口设计
在以上的当中我们已经将责任链当中各个处理器实现完毕,那么接下来就来实现用户调用责任链的接口类,在该类当中实现给用户提供选择要运行的处理器函数、启动责任链的函数。
实现代码如下所示:
//责任链入口类class HandlerEntry{public: HandlerEntry() { //设置好责任链当中的各个处理器 _format = std::make_shared(); _save = std::make_shared(); _backup = std::make_shared(); _format->SetNextHandler(_save); _save->SetNextHandler(_backup); } // 运行责任链 void Run(std::string &text) { _format->Execute(text); } //让用户启动要责任链当中运行的处理器 void EnableHandler(bool isformat, bool issave, bool isbackup) { isformat ? _format->Enble() : _format->DisEnble(); issave ? _save->Enble() : _save->DisEnble(); isbackup ? _backup->Enble() : _backup->DisEnble(); }private: //格式化处理器 std::shared_ptr _format; //保存处理器 std::shared_ptr _save; //备份处理器 std::shared_ptr _backup;};
注:以上责任链实现的代码当中使用到了一些C++11当中的语法,例如智能指针,还使用到了C++17当中的filesystem库,如果你之前为了解相关的知识建议先将这部分的知识了解看看。
6. 运行程序
将以上的代码都实现之后接下来就再实现对应的makefile文件,代码如下所示:
.PHONY:allall:Client ServerServer:Server.ccg++ -o $@ $^ -std=c++17Client:Client.ccg++ -o $@ $^ -std=c++17.PHONY:cleanclean:rm -rf Client Server tmp
程序运行效果如下所示:
实现了打包压缩的功能
3. System V信息量
3.1 临界资源相关概念
以上我们已经了解了进程间通信的两种方式共享内存和消息队列,那么接下来就再来了解本篇当中介绍的最后一个通信方式——信号量。
在正式的了解信号量相关的概念之前我们先来了解同步和互斥相关的概念。
在以上使用共享内存进行两个进程之间的通信的时候就可以发现当一个进程还在写数据的时候,另一个进程是可以直接将当前已经写入的数据进行读取的,那么这是就会出现读取到的数据可能只是真正要传输数据的一部分,这是就会出现数据不一致的问题。
那么以上的问题要如何解决呢?实际上出现数据不一致的问题核心就是读和写进程都能都是的访问对应的资源也就是共享内存,那么实际上只需要将让在同一时间内只有一个进程能访问对应的资源就可解决以上的问题。
在操作系统当中将多个进程能看到的公共的资源叫做共享资源,而在共享资源当中要进行保护的资源又称为临界资源,在进程中涉及到互斥资源的程序段叫临界区。你写的代码=访问临界资源的代码(临界区)+不访问临界资源的代码(非临界区)。
在实际的操作当中就需要对临界区进行加锁
了解了以上的概念之后接下来就可以来了解什么是同步、互斥。
当只有一个进程能在临界区当中时这是进行资源访问的进程之间就是互斥的关系。
就例如当我们在使用银行当中的ATM机时如果你进入到一个ATM机内部的时候再把门锁上之后其他人是无法进入到你所在的ATM机内部的,这是实际上你和其他人就形成了互斥的关系。
而同步类比到ATM机当中就是当你在使用ATM机的时候门外的人正常情况下是会在门口排队的,那么一个个有序的进入到ATM机当中就形成同步。
当多个进程访问临界资源的时候,具有一定的顺序性就叫做同步。
以上我们提到了在临界区当中要进行加锁,那么这时就有问题了,在锁在被访问的时候应该需要被保护啊,那么这时候是不是要给锁再加锁呢?
实际上是不要的,因为进行加锁和解锁的操作实际上是原子的,而具体锁原子性是如何实现的要等到之后线程的学习中再了解。
3.2 信号量概念
实际上信号量的本质就是一个计数器,来表明临界资源当中资源的多少。
以下可以通过一个例子来了解信号量的概念。
在电影院当中我们都知道如果要看一场电影那么就需要有对应的电影票才能进入到放映室当中,在一场电影当中又拥有多个座位,那么这时就可以将一场电影看作一个临界资源,而一个作为就是临界资源当中的一小部分资源。
而在电影院当中只要你买了对应的票,那么那场电影对应的座位就是你的了,无论你是否有真的区看电影。
在一场电影当中最怕出现的就是两个问题,一个是卖出的票数量比实际上座位的数量还多,另外一个就是卖出的票当中出现座位重复的情况。
实际上临界资源就类似一场电影,每一部分的临界资源就可以看作是电影放映室的一个座位,只要进程申请的对应的临界资源那么就算你没有使用这部分的资源也是属于对应的进程的。但是和以上提到的电影院当中最怕的两个问题类似,在进程申请临界资源的时候只要进程申请成功就需要将对应的信号量减少,但信号量的个数减到0的时候进程就无法再申请临界资源。
因此综上信号量就是对临界资源的预定机制。当要申请资源的时候,计数器++,进行P操作,进行资源释放的时候计数器--,进行V操作。
在临界资源的使用可能会出现以下的情况
一个是一次性将临界资源当中的所有资源一起申请,那么这就可以类比电影院当中的一个人的VIP放映室,而另外一种就是普通的一次性的申请一部分的临界资源,类比就是电影院当中的普通放映室。
其实信号量就是临界资源,而将那种只有0和1两种情况的信号量称为二元信号量。
这时你可能就会有疑惑了,为什么信号量没有实现两个进程之间的数据传输还能被定义为进程间通信的一种方式呢?
我们知道进程将通信的前提室让不同的进程看到同一份资源,这里信号量确实让不同的进程看到了,在此要了解到的是不是说只有进程之间的数据传输才是进程间的通信,除此之外进程之间的同步互斥以及通知等也算是进程之间的通信。
3.3 信号量接口
以下是信号量在系统当中的结构体
struct semaphore{int value;pointer_PCB queue;}
以上两个成员变量分别表示当前信号量当中可用资源的个数以及当前阻塞的进程队列。
当value > 0
:表示系统中有 value
个资源可以分配。value == 0
:表示资源刚好被占完,新的请求要等待。value < 0
:表示有 |value|
个进程正在等待(阻塞在这个信号量上)。
以下是信号量集合属性的结构体
struct ipc_perm { key_t __key; /* 用户调用 semget/shmget/msgget 时给的 key */ uid_t uid; /* 拥有者的有效 UID */ gid_t gid; /* 拥有者的有效 GID */ uid_t cuid; /* 创建者的有效 UID */ gid_t cgid; /* 创建者的有效 GID */ unsigned short mode; /* 权限位 (类似 0666) */ unsigned short __seq; /* 内核用的序列号 */};struct semid_ds { struct ipc_perm sem_perm; /* 拥有者和权限 */ time_t sem_otime; /* 最后一次 semop 时间 */ time_t sem_ctime; /* 最后一次变更时间 (semctl) */ unsigned long sem_nsems; /* 集合中的信号量个数 */};
以下是信号量当中创建的系统调用:
#include #include #include int semget(key_t key, int nsems, int semflg);参数:• key: 信号量集的键值,同消息队列和共享内存 • nsems: 信号量集中信号量的个数 • semflg: 同消息队列和共享内存 返回值:创建信号量成功返回的结果为创建成功的信号量的个数,否则返回-1.
以下是信号量控制相关的系统调用:
#include #include #include int semctl(int semid, int semnum, int cmd, ...);参数:semid :信号量集合的标识符(由 semget() 返回)。semnum :信号量集合中的某一个信号量编号(从 0 开始)。对整个集合操作时可忽略(如 IPC_RMID)。cmd :要执行的命令(控制功能)。arg :可选参数,类型是 union semun,不同 cmd 时有不同含义以下联合体需要自己实现:union semun { int val; /* 用于 SETVAL */ struct semid_ds *buf; /* 用于 IPC_STAT 和 IPC_SET */ unsigned short *array; /* 用于 GETALL 和 SETALL */ struct seminfo *__buf; /* 用于 IPC_INFO (Linux特有) */};
以下是一般cmd使用的命令
以下是进行信号量加减传统调用:
#include #include #include int semop(int semid, struct sembuf *sops, size_t nsops);参数:semid:信号量集合的 ID(semget 返回)。sops:指向一个 struct sembuf 数组,描述要进行的操作。nsops:sops 数组里元素的个数(一次可以执行多个操作,保证原子性)。在Linux当中以下结构体不需要自己实现:struct sembuf { unsigned short sem_num; /* 要操作的信号量编号 (0 ~ sem_nsems-1) */ short sem_op; /* 操作类型:负数= P (等待),正数= V (释放),0=等待为0 */ short sem_flg; /* 操作标志:IPC_NOWAIT, SEM_UNDO 等 */};
3.4 信号量demo代码
以上在了解了信号量对应的系统调用接口之后就可以试着使用以上的接口来实现一份demo代码,在此我们要实现的是使用二元信号量来实现显示器的交替打印
在此使用面向对象的方式,在sem.hpp实现信号量创建以及销毁等接口,之后再Write.cc文件当中只需要创建出信号量对象之后再使用P和V操作即可。
实现代码如下所示:
#pragma once#include #include #include #include #include #include #include const std::string pathname = \"/tmp\";int proj_id = 0x77;#define GET_SEM IPC_CREAT#define BUILD_SEM (IPC_CREAT | IPC_EXCL | 0666)//信号量class Semaaphore{public: Semaaphore(int semid, int flag) : _semid(semid), _flag(flag) { } ~Semaaphore() { if (_flag == GET_SEM) return; int n = semctl(_semid, 0, IPC_RMID); (void)n; std::cout << \"sem destory\" << std::endl; } //P操作 void P() { struct sembuf s; s.sem_op = -1; s.sem_num = 0; s.sem_flg = SEM_UNDO; int n = semop(_semid, &s, 1); (void)n; } //V操作 void V() { struct sembuf s; s.sem_op = 1; s.sem_num = 0; s.sem_flg = SEM_UNDO; int n = semop(_semid, &s, 1); (void)n; }private: int _semid; int _flag;};//信号量构造器class SemaphoreBulider{public: SemaphoreBulider() : _val(-1) { } ~SemaphoreBulider() { } //设置信号量初始值 SemaphoreBulider &SetVal(int val) { _val = val; return *this; } //构建信号量 std::shared_ptr Build(int flag) { if (_val < 0) { std::cerr << \"you must init val\" << std::endl; return nullptr; } key_t key = ftok(pathname.c_str(), proj_id); if (key < 0) { std::cerr << \"ftok cerror\" << std::endl; exit(1); } int semid = semget(key, 1, flag); if (semid < 0) { std::cerr << \"semget cerror\" << std::endl; exit(1); } if (flag == BUILD_SEM) { union semum { int val; struct semid_ds *buf; unsigned short *array; struct seminfo *_buf; } un; un.val = _val; //设置信号量初始值 int n = semctl(semid, 0, SETVAL, un); if (n < 0) { std::cerr << \"semctl cerror\" << std::endl; exit(2); } } return std::make_shared(semid, _val); }private://信号量初始值 int _val;};
#include \"sem.hpp\"#includeint main(){ SemaphoreBulider s; auto fsem = s.SetVal(1).Build(BUILD_SEM); pid_t pid = fork(); if (pid == 0) { auto csem = s.Build(GET_SEM); int cnt = 10; while (cnt--) { csem->P(); printf(\"C\"); fflush(stdout); sleep(1); printf(\"C\"); fflush(stdout); csem->V(); } } int cnt = 20; while (cnt--) { fsem->P(); printf(\"F\"); fflush(stdout); sleep(1); printf(\"F\"); fflush(stdout); fsem->V(); } waitpid(pid,nullptr,0); return 0;}
以上main函数当中我们让父子进程同时向显示器当中写入,在此是想让父进程写入两个字符之后子进程再写入两个字符那么如果在没有使用信号量时输出的字符是交错的
而有了二元信号量的约束之后就可以实现我们的要求。
接下来试着将以上的代码修改为建造者模式
建造者模式是一种 创建型设计模式,它的核心思想是:
👉将一个复杂对象的构建过程与它的表示分离,使得同样的构建过程可以创建不同的表示。
换句话说,建造者模式关注的是“一步步构造一个复杂对象”,而不是一次性 new 出来。
实现代码如下所示:
#ifndef SEM_HPP#define SEM_HPP#include #include #include #include #include #include #include const std::string SEM_PATH = \"/tmp\";const int SEM_PROJ_ID = 0x77;const int defaultnum = 1;#define GET_SEM (IPC_CREAT)#define BUILD_SEM (IPC_CREAT | IPC_EXCL)// 一个把整数转十六进制的函数std::string intToHex(int num){ char hex[64]; snprintf(hex, sizeof(hex), \"0x%x\", num); return std::string(hex);}// 产品类, 只需要关心自己的使用即可// 这里的Semaphore不是一个信号量!!而是一个信号量集合!!,要指明你要PV操作哪一个信号量!!class Semaphore{private: void PV(int who, int data) { struct sembuf sem_buf; sem_buf.sem_num = who; // 信号量编号,从0开始 sem_buf.sem_op = data; // S + sem_buf.sem_op sem_buf.sem_flg = SEM_UNDO; // 不关心 int n = semop(_semid, &sem_buf, 1); if (n < 0) { std::cerr << \"semop PV failed\" <= 0) { int n = semctl(_semid, 0, IPC_RMID); if (n < 0) { std::cerr << \"semctl IPC_RMID failed\" << std::endl; } std::cout << \"Semaphore \" << _semid << \" removed\" << std::endl; } }private: int _semid; };// 建造者接口class SemaphoreBuilder{public: virtual ~SemaphoreBuilder() = default; virtual void BuildKey() = 0; virtual void SetPermission(int perm) = 0; virtual void SetSemNum(int num) = 0; virtual void SetInitVal(std::vector initVal) = 0; virtual void Build(int flag) = 0; virtual void InitSem() = 0; virtual std::shared_ptr GetSem() = 0;};// 具体建造者类class ConcreteSemaphoreBuilder : public SemaphoreBuilder{public: ConcreteSemaphoreBuilder() {} virtual void BuildKey() override { // 1. 构建键值 std::cout << \"Building a semaphore\" << std::endl; _key = ftok(SEM_PATH.c_str(), SEM_PROJ_ID); if (_key < 0) { std::cerr << \"ftok failed\" << std::endl; exit(1); } std::cout << \"Got key: \" << intToHex(_key) << std::endl; } virtual void SetPermission(int perm) override { _perm = perm; } virtual void SetSemNum(int num) override { _num = num; } virtual void SetInitVal(std::vector initVal) override { _initVal = initVal; } virtual void Build(int flag) override { // 2. 创建信号量集合 int semid = semget(_key, _num, flag | _perm); if (semid < 0) { std::cerr << \"semget failed\" << std::endl; exit(2); } std::cout << \"Got semaphore id: \" << semid << std::endl; _sem = std::make_shared(semid); } virtual void InitSem() override { if (_num > 0 && _initVal.size() == _num) { // 3. 初始化信号量集合 for (int i = 0; i Id(), i, _initVal[i])) { std::cerr << \"Init failed\" << std::endl; exit(3); } } } } // 4. 获取信号量集合 virtual std::shared_ptr GetSem() override { return _sem; } private: bool Init(int semid, int num, int val) { union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux-specific) */ } un; un.val = val; int n = semctl(semid, num, SETVAL, un); if (n < 0) { std::cerr << \"semctl SETVAL failed\" << std::endl; return false; } return true; }private: key_t _key;// 信号量集合的键值 int _perm; // 权限 int _num; // 信号量集合的个数 std::vector _initVal; // 信号量集合的初始值 std::shared_ptr _sem; // 我们要创建的具体产品};// 指挥者类class Director{public: void Construct(std::shared_ptr builder, int flag, int perm = 0666, int num = defaultnum, std::vector initVal = {1}) { builder->BuildKey(); builder->SetPermission(perm); builder->SetSemNum(num); builder->SetInitVal(initVal); builder->Build(flag); if (flag == BUILD_SEM) { builder->InitSem(); } }};#endif // SEM_HPP
#include \"sem.hpp\"#includeint main(){ // 基于抽象接口类的具体建造者 std::shared_ptr builder = std::make_shared(); // 指挥者对象 std::shared_ptr director = std::make_shared(); // 在指挥者的指导下,完成建造过程 director->Construct(builder, BUILD_SEM, 0600, 3, {1, 2, 3}); // 完成了对象的创建的过程,获取对象 auto fsem = builder->GetSem(); srand(time(0) ^ getpid()); pid_t pid = fork(); // 我们期望的是,父子进行打印的时候,C或者F必须成对出现!保证打印是原子的. if (pid == 0) { director->Construct(builder, GET_SEM); auto csem = builder->GetSem(); while (true) { csem->P(0); printf(\"C\"); usleep(rand() % 95270); fflush(stdout); printf(\"C\"); usleep(rand() % 43990); fflush(stdout); csem->V(0); } } while (true) { fsem->P(0); printf(\"F\"); usleep(rand() % 95270); fflush(stdout); printf(\"F\"); usleep(rand() % 43990); fflush(stdout); fsem->V(0); } return 0;}