> 技术文档 > reactor以及百万服务器并发的实现_reactor写法

reactor以及百万服务器并发的实现_reactor写法


一、reactor的实现

零声教育

reator的优点

不同的文件描述符(fd)发生了不同的事件(event),就会自动分派到不同的回调函数(callback)去处理。
对于epoll来说,如果一个io发送大数据包以至于超过了buffer的大小,会导致多出的数据没办法存下来。
reactor做到了:不同的事件,做不同的action,并且每个io独立。

reactor.c代码

#include #include #include #include #include #include #include #include #include #include #include #define BUFFER_LENGTH 1024#define CONNECTION_SIZE 1024typedef int (*RCALLBACK)(int fd);int accept_cb(int fd);int recv_cb(int fd);int send_cb(int fd);int epfd = 0;struct conn {int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};struct conn conn_list[CONNECTION_SIZE] = {0};int set_event(int fd, int event, int flag) {if (flag) {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int event_register(int fd, int event) {conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd,event,1);}int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);printf(\"accept finished:%d\\n\", clientfd);event_register(clientfd,EPOLLIN);return 0;}int recv_cb(int fd) {int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) {printf(\"client disconnect: %d\\n\", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinishedreturn 0;}conn_list[fd].rlength = count;printf(\"RECV: %s\\n\", conn_list[fd].rbuffer);#if 1conn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);#endifset_event(fd, EPOLLOUT, 0);return count;}int send_cb(int fd) {int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLIN, 0);return count;}int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0servaddr.sin_port = htons(port); //0-1023if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {printf(\"bind failed: %s\\n\", strerror(errno));}listen(sockfd, 10);printf(\"listen finished:%d\\n\", sockfd);return sockfd;}int main() {unsigned short port = 2000;int sockfd = init_server(port);epfd = epoll_create(1);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.recv_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);while (1) { //mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);}if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}}}}
  1. main()函数中,有这么一行conn_list[sockfd].r_action.recv_callback = accept_cb;刚开始时我认为应该是conn_list[sockfd].r_action.accept_callback = accept_cb,后来才知道是因为union 联合体里面所有成员共用一块内存,你赋值给哪个,实际上只是往这一块内存里写地址。也就是说用 recv_callback 和 accept_callback ,只是名字上的区分,本质就是同一块内存。所以写成recv_callback还是accept_callback都是一样的效果。
  2. int init_server(unsigned short port)的作用是初始化服务器,创建 socket,绑定本地地址和端口,然后进入监听状态并返回监听的fd,准备好接受新客户端连接。
  3. int set_event(int fd, int event, int flag)的作用是将某个fd注册到epoll或修改已经注册的事件,是epoll_ctl 的包装。
  4. int event_register(int fd, int event)的作用是初始化一个新连接的结构体,分配对应匹配的回调函数以及清空缓存区,并且把这个fd注册到epoll事件监控当中。
  5. int accept_cb(int fd)的作用是处理新客户端连接,当有新连接到来时被 epoll 触发,执行 accept,并对新 fd 做初始化。
  6. int recv_cb(int fd)的作用是接收客户端数据。用 recv() 从 fd 读取数据到读缓冲区,如果收到的数据长度是0,说明客户端断开,做清理(close() 并从 epoll 删除该 fd),如果收到数据,显示收到内容,拷贝收到的数据到写缓冲区(实现 echo 功能),修改 fd 的 epoll 事件为EPOLLOUT (准备回写数据)。
  7. int send_cb(int fd)的作用是发送数据给客户端。用 send() 把写缓冲区的数据写给客户端,修改 fd 的 epoll 事件为 EPOLLIN。

二、服务器百万并发

将上面reactor.c的代码中的CONNECTION_SIZE宏定义为1048576,用作服务器。

客户端代码mul_port_client_epoll.c

#include #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_BUFFER128#define MAX_EPOLLSIZE(384*1024)#define MAX_PORT1#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)int isContinue = 0;static int ntySetNonblock(int fd) {int flags;flags = fcntl(fd, F_GETFL, 0);if (flags < 0) return flags;flags |= O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) < 0) return -1;return 0;}static int ntySetReUseAddr(int fd) {int reuse = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));}int main(int argc, char **argv) {if (argc <= 2) {printf(\"Usage: %s ip port\\n\", argv[0]);exit(0);}const char *ip = argv[1];int port = atoi(argv[2]);int connections = 0;char buffer[128] = {0};int i = 0, index = 0;struct epoll_event events[MAX_EPOLLSIZE];int epoll_fd = epoll_create(MAX_EPOLLSIZE);strcpy(buffer, \" Data From MulClient\\n\");struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr(ip);struct timeval tv_begin;gettimeofday(&tv_begin, NULL);int sockfd = 0;while (1) {if (++index >= MAX_PORT) index = 0;struct epoll_event ev;if (connections < 340000 && !isContinue) {sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror(\"socket\");goto err;}//ntySetReUseAddr(sockfd);addr.sin_port = htons(port+index);if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {perror(\"connect\");goto err;}ntySetNonblock(sockfd);ntySetReUseAddr(sockfd);sprintf(buffer, \"Hello Server: client --> %d\\n\", connections);send(sockfd, buffer, strlen(buffer), 0);ev.data.fd = sockfd;ev.events = EPOLLIN | EPOLLOUT;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);connections ++;}//connections ++;if (connections % 1000 == 999 || connections >= 340000) {struct timeval tv_cur;memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));gettimeofday(&tv_begin, NULL);int time_used = TIME_SUB_MS(tv_begin, tv_cur);printf(\"connections: %d, sockfd:%d, time_used:%d\\n\", connections, sockfd, time_used);int nfds = epoll_wait(epoll_fd, events, connections, 100);for (i = 0;i < nfds;i ++) {int clientfd = events[i].data.fd;if (events[i].events & EPOLLOUT) {//sprintf(buffer, \"data from %d\\n\", clientfd);send(sockfd, buffer, strlen(buffer), 0);} else if (events[i].events & EPOLLIN) {char rBuffer[MAX_BUFFER] = {0};ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);if (length > 0) {//printf(\" RecvBuffer:%s\\n\", rBuffer);if (!strcmp(rBuffer, \"quit\")) {isContinue = 0;}} else if (length == 0) {printf(\" Disconnect clientfd:%d\\n\", clientfd);connections --;close(clientfd);} else {if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;printf(\" Error clientfd:%d, errno:%d\\n\", clientfd, errno);close(clientfd);}} else {printf(\" clientfd:%d, errno:%d\\n\", clientfd, errno);close(clientfd);}}}usleep(500);}return 0;err:printf(\"error : %s\\n\", strerror(errno));return 0;}

第一次运行

出现too many open files问题

在这里插入图片描述

左边是一个服务器,右边是三个客户端,可以看出,当连接数超过1024时,出现了错误:too many open files。

解决方法

在这里插入图片描述
客户端键入ulimit -a后,显示open files:1024,键入ulimit -n 1048576将open files改为1048576,不仅客户端要修改,服务端也要修改。

第二次运行

reactor.c编译不通过

在这里插入图片描述
可以明显看出,代码中struct conn conn_list[CONNECTION_SIZE] = {0}定义了一个非常大的全局(静态)数组或者结构体数组,这会在全局数据段(静态区/BSS段)分配一大块内存,比如上百万个结构体,每个2KB左右,总共会占用2GB甚至更多的空间。编译器/链接器(尤其是ld)对单个全局变量的大小有限制,通常2GB左右,超出就会报错。下面尝试通过molloc的方法来解决。malloc是 C 语言中用来“向操作系统申请一块内存”的函数,英文全称是 Memory Allocation(分配内存),用法是malloc(想要的字节数),申请到的内存是“堆”上的内存,可以随便用、用完还可以归还,返回的是“新分配的内存的首地址”。

解决方法

服务端键入gcc -o reactor reactor.c -mcmodel=medium。

这是GCC在x86_64(64位)架构下的一个内存模型参数,它调整了编译器对静态数据段(BSS/Data段)的寻址方式,可以支持比默认更大的全局变量。x86_64平台有不同的内存模型,-mcmodel=medium让全局和静态变量寻址空间变大,允许你分配比默认更大的静态全局数据(比如大于2GB)。默认是-mcmodel=small,全局静态变量寻址有限(一般最大2GB)。用-mcmodel=medium,全局静态数据可以很大(只要物理内存和虚拟地址空间支持)

第三次运行

出现客户端本地端口耗尽的问题

在这里插入图片描述

单个端口(如服务器的2000端口)理论上可以同时支持成千上万个连接,实际能建立多少连接取决于操作系统的资源限制,比如文件描述符数量、TCP本地端口可用范围等。当出现‘Cannot assign requested address’等错误时,往往是客户端本地端口或其它资源耗尽了,而不是服务器端口本身的限制。
此外,因为每条打印过于耗时,所以每一千个连接打印一条信息以及其所耗的时间。

解决方法

把服务器的端口从一个(2000)增加到20个。

reactor.c代码

#include #include #include #include #include #include #include #include #include #include #include #define BUFFER_LENGTH 1024#define CONNECTION_SIZE 1048576#define MAX_PORTS 20#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)typedef int (*RCALLBACK)(int fd);int accept_cb(int fd);int recv_cb(int fd);int send_cb(int fd);int epfd = 0;struct timeval begin;struct conn {int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};struct conn conn_list[CONNECTION_SIZE] = {0};int set_event(int fd, int event, int flag) {if (flag) {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int event_register(int fd, int event) {if (fd < 0)return -1;conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd,event,1);}int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf(\"accept finished:%d\\n\", clientfd);if (clientfd < 0) {printf(\"accept errno: %d --> %s\\n\", errno, strerror(errno));return -1;}event_register(clientfd, EPOLLIN);if ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(&current, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, &current, sizeof(struct timeval));printf(\"accept finished: %d, time_used: %d\\n\", clientfd, time_used);}return 0;}int recv_cb(int fd) {int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) {printf(\"client disconnect: %d\\n\", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinishedreturn 0;}conn_list[fd].rlength = count;//printf(\"RECV: %s\\n\", conn_list[fd].rbuffer);#if 1conn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);#endifset_event(fd, EPOLLOUT, 0);return count;}int send_cb(int fd) {int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLIN, 0);return count;}int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0servaddr.sin_port = htons(port); //0-1023if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {printf(\"bind failed: %s\\n\", strerror(errno));}listen(sockfd, 10);//printf(\"listen finished:%d\\n\", sockfd);return sockfd;}int main() {unsigned short port = 2000;epfd = epoll_create(1);int i = 0;for (i = 0;i < MAX_PORTS;i ++) {int sockfd = init_server(port + i);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.recv_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);while (1) { //mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);}if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}}}}

第四次运行

在运行前,每台虚拟机键入sudo vim /etc/security/limits.conf,在文件底部添加内容如下图所示,中间的空是tab键,没有空格键。
在这里插入图片描述

出现客户端连接超时的问题

在这里插入图片描述

解决方法(改eth0)

ifconfig查看服务器虚拟机的ip地址,然后键入sudo vim /etc/default/grub ,修改为下图中的样子:
在这里插入图片描述
系统的引导加载器grub配置文件,GRUB_CMDLINE_LINUX 参数被修改为\"net.ifnames=0 biosdevname=0\",这禁用了新的网络接口命名规则(如 ens33、enp0s3),使用传统的网络接口名称(如 eth0、eth1)。
随后键入sudo update-grub,更新后键入sudo vim /etc/netplan/00-installer-config.yaml ,修改内容为下图:
在这里插入图片描述
配置了一个名为 eth0 的网络接口,dhcp4: true: 使用 DHCP 自动获取 IPv4 地址,version: 2: 使用 Netplan 配置格式版本 2。
最后键入sudo shutdown -h now关机,关机后增加一个网络适配器:NAT。
一张网卡用于SSH连接,另一张网卡用于跑百万服务器。

第五次运行

运行成功

在这里插入图片描述
经过一段时间的等待,总连接数跑到了102W,实现了百万并发。

三、总结reactor.c的执行流程

从main()函数入手:

unsigned short port = 2000;

定义一个无符号短整型的变量port用来存储端口号2000,无符号(unsigned)只能表示0和正数,短整型(short)只能表示2字节的整数。

epfd = epoll_create(1);

创建一个epoll实例并返回一个管理epoll实例的文件描述符epfd。

int i = 0;for (i = 0;i < MAX_PORTS;i ++) {int sockfd = init_server(port + i);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.recv_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}

对于每个端口,调用init_server函数并返回对应的sockfd,下面是init_server对应的代码:

int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0servaddr.sin_port = htons(port); //0-1023if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {printf(\"bind failed: %s\\n\", strerror(errno));}listen(sockfd, 10);//printf(\"listen finished:%d\\n\", sockfd);return sockfd;}

在init_server中,创建一个IPV4 TCP套接字,返回一个sockfd;创建一个类型为struct sockaddr_in的结构体变量servaddr,将这个变量的地址族设置为IPV4,服务器监听所有网络接口并指定服务器监听的端口号,通过bind()将套接字(socket) 和 IP地址、端口号 绑定在一起,通过listen()将套接字设置为监听状态,准备接收客户端的连接请求,返回sockfd。
再回到main()函数当中,拿着返回的fd,将结构体数组conn_list的索引号与返回的sockfd的值对应上并且将conn_list中的成员联合体r_action中的recv_callback函数指针指向accept_cb函数的地址。
随后进入到set_event函数中:

int set_event(int fd, int event, int flag) {if (flag) {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}

定义一个类型为struct epoll_event的结构体变量ev,设置ev当中的事件以及对应的fd并且通过epoll_ctl将这些通过epfd添加epoll实例当中。
回到main函数:

gettimeofday(&begin, NULL);

获取当前时间并存储到 begin 变量

while (1) { //mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);}if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}}}

创建一个类型为struct epoll_event的结构体变量events,定义了一个存储最多1024个事件的数组,并将每个事件的字段初始化为0。通过epoll_wait等待指定的 epoll 实例上的事件发生,将触发的事件按顺序从第一个元素开始写入 events 数组,并返回触发的事件数。for循环遍历已经写入events的所有元素。对于循环内部,将事件对应的fd赋值给connfd,如果事件是可读,执行前面设置好的accept_cb,传入参数connfd,对于accept_cb函数:

int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf(\"accept finished:%d\\n\", clientfd);if (clientfd < 0) {printf(\"accept errno: %d --> %s\\n\", errno, strerror(errno));return -1;}event_register(clientfd, EPOLLIN);if ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(&current, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, &current, sizeof(struct timeval));printf(\"accept finished: %d, time_used: %d\\n\", clientfd, time_used);}return 0;}

accept建立连接并返回clientfd,调用event_register并传入参数clientfd,EPOLLIN:

int event_register(int fd, int event) {if (fd < 0)return -1;conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd,event,1);}

依旧是将conn_list数组的索引对应fd,并且把recv_callback和send_callback函数指针指向recv_cb函数和send_cb函数的地址,清空fd对应的rbuffer、wbuffer并重置rlength、wlength,通过set_event函数将引入的fd和事件添加到epoll实例当中。
回到accept_cb当中,每一千条连接打印连接数已经连接所用的时间。
再回到主程序的while循环当中,如果是可写事件,通过函数指针send_callback回调函数send_cb,并传入参数connfd:

int send_cb(int fd) {int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLIN, 0);return count;}

将connfd对应的wbuffer中的长度为wlength的数据发送到指定的fd当中,并且返回成功发送的字节数count,通过sent_event将此fd的事件从可写改为可读,返回count,再次返回到main()函数重复执行while循环。