【Linux | 网络】多路转接IO之select

- 一、select函数
- 二、select的优缺点
- 三、实现select服务器(只关心读事件)
-
- 3.1 Log.hpp(日志)
- 3.2 Lockguard.hpp(自动管理锁)
- 3.3 Socket.hpp(封装套接字)
- 3.4 SelectServer.hpp(服务端封装)
- 3.5 Main.cpp(服务端)
- 结尾
一、select函数
#include int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
功能:用于同时监控多个文件描述符的 IO 状态,通过 select,程序可以在单个线程中处理多个 IO 事件,避免为每个 IO 操作创建单独的线程,从而提高资源利用率和并发性能。但是select只负责等待,拷贝需要read、recv、write、send等负责。
参数:
- nfds:需要检查的最大文件描述符值 + 1
- readfds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的读事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的读事件 是否就绪
- 输入时:
- writefds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的写事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的写事件 是否就绪
- 输入时:
- exceptfds:输入输出型参数,fd_set本质上是一张位图
- 输入时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:用户告诉内核,是否监控对应文件描述符的异常事件
- 输出时:
- 比特位的位置:对应文件描述符的值
- 比特位的内容:内核告诉用户,对应文件描述符的异常事件 是否就绪
- 输入时:
- timeout:超时时间,控制 select 的阻塞行为:
- NULL:永久阻塞,直到有文件描述符就绪
- 0:立即返回(非阻塞模式)
- >0:指定超时时间(秒 + 微秒),超时后返回。

返回值:
- 正数:表示就绪的文件描述符总数
- 0:表示超时(无文件描述符就绪)
- -1:表示错误,并设置 errno(如 EINTR 表示被信号中断)
操作系统不建议用户直接修改fd_set位图,所以操作系统提供以下宏操作,来操作fd_set位图。
FD_ZERO(fd_set *set); // 清空集合FD_SET(int fd, fd_set *set); // 将 fd 添加到集合FD_CLR(int fd, fd_set *set); // 从集合中移除 fdFD_ISSET(int fd, fd_set *set); // 检查 fd 是否在集合中(就绪时返回非零)
二、select的优缺点
- 优点
- select只负责等待,可以等待多个文件描述符,在IO的时候效率会比较高
- 缺点
- 使用select的时候,用户每次都需要对select的参数进行重置
- 在编写代码的时候,select需要用到第三方数组,会充满遍历,可能会影响select的效率
- 用户到内核,内核到用户,每次select的调用和返回,都需要对位图进行重置操作;用户和内核之间,需要一直进行数据拷贝
- select会让操作系统在底层遍历要关心的所有文件描述符,会导致效率降低
- fd_set是操作系统提供的一个类型,本身是一个位图,fd_set的大小是固定的,也就是fd_set的比特位位数是有上线的,所以select能够检测文件描述符的个数也是有限的
三、实现select服务器(只关心读事件)
3.1 Log.hpp(日志)
#pragma once#include \"LockGuard.hpp\"#include #include #include #include #include #include #include #include #include #include using namespace std;// 日志等级enum{ Debug = 0, // 调试 Info, // 正常 Warning, // 警告 Error, // 错误,但程序并未直接退出 Fatal // 程序直接挂掉};enum{ Screen = 10, // 打印到显示器上 OneFile, // 打印到一个文件中 ClassFile // 按照日志等级打印到不同的文件中};string LevelToString(int level){ switch (level) { case Debug: return \"Debug\"; case Info: return \"Info\"; case Warning: return \"Warning\"; case Error: return \"Error\"; case Fatal: return \"Fatal\"; default: return \"Unknow\"; }}const char *default_filename = \"log.\";const int default_style = Screen;const char *defaultdir = \"log\";class Log{public: Log() : style(default_style), filename(default_filename) { // mkdir(defaultdir,0775); pthread_mutex_init(&_log_mutex, nullptr); } void SwitchStyle(int sty) { style = sty; } void WriteLogToOneFile(const string &logname, const string &logmessage) { int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666); if (fd == -1) return; { LockGuard lockguard(&_log_mutex); write(fd, logmessage.c_str(), logmessage.size()); } close(fd); } void WriteLogToClassFile(const string &levelstr, const string &logmessage) { mkdir(defaultdir, 0775); string name = defaultdir; name += \"/\"; name += filename; name += levelstr; WriteLogToOneFile(name, logmessage); } void WriteLog(int level, const string &logmessage) { switch (style) { case Screen: { LockGuard lockguard(&_log_mutex); cout << logmessage; } break; case OneFile: WriteLogToClassFile(\"All\", logmessage); break; case ClassFile: WriteLogToClassFile(LevelToString(level), logmessage); break; default: break; } } string GetTime() { time_t CurrentTime = time(nullptr); struct tm *curtime = localtime(&CurrentTime); char time[128]; // localtime 的年是从1900开始的,所以要加1900, 月是从0开始的所以加1 snprintf(time, sizeof(time), \"%d-%d-%d %d:%d:%d\", curtime->tm_year + 1900, curtime->tm_mon + 1, curtime->tm_mday, curtime->tm_hour, curtime->tm_min, curtime->tm_sec); return time; return \"\"; } void LogMessage(int level, const char *format, ...) { char left[1024]; string Levelstr = LevelToString(level).c_str(); string Timestr = GetTime().c_str(); string Idstr = to_string(getpid()); snprintf(left, sizeof(left), \"[%s][%s][%s] \", Levelstr.c_str(), Timestr.c_str(), Idstr.c_str()); va_list args; va_start(args, format); char right[1024]; vsnprintf(right, sizeof(right), format, args); string logmessage = left; logmessage += right; WriteLog(level, logmessage); va_end(args); } ~Log() { pthread_mutex_destroy(&_log_mutex); };private: int style; string filename; pthread_mutex_t _log_mutex;};Log lg;class Conf{public: Conf() { lg.SwitchStyle(Screen); } ~Conf() { }};Conf conf;
3.2 Lockguard.hpp(自动管理锁)
#pragma once#include class Mutex{public: Mutex(pthread_mutex_t* lock) :pmutex(lock) {} void Lock() { pthread_mutex_lock(pmutex); } void Unlock() { pthread_mutex_unlock(pmutex); } ~Mutex() {}public: pthread_mutex_t* pmutex;};class LockGuard{public: LockGuard(pthread_mutex_t* lock) :mutex(lock) { mutex.Lock(); } ~LockGuard() { mutex.Unlock(); }public: Mutex mutex;};
3.3 Socket.hpp(封装套接字)
#pragma once#include #include #include #include #include #include #include #include #include #define CONV(addrptr) (struct sockaddr*)addrptrenum{ Socket_err = 1, Bind_err, Listen_err};const static int defalutsockfd = -1;const int defalutbacklog = 5;class Socket{public: virtual ~Socket(){}; virtual void CreateSocketOrDie() = 0; virtual void BindSocketOrDie(uint16_t port) = 0; virtual void ListenSocketOrDie(int backlog) = 0; virtual Socket* AcceptConnection(std::string* ip , uint16_t* port) = 0; virtual bool ConnectServer(const std::string& serverip , uint16_t serverport) = 0; virtual int GetSockFd() = 0; virtual void SetSockFd(int sockfd) = 0; virtual void CloseSockFd() = 0; virtual bool Recv(std::string& buffer,int size) = 0; virtual void Send(const std::string& send_string) = 0;public: void BuildListenSocketMethod(uint16_t port,int backlog = defalutbacklog) { CreateSocketOrDie(); BindSocketOrDie(port); ListenSocketOrDie(backlog); } bool BuildConnectSocketMethod(const std::string& serverip , uint16_t serverport) { CreateSocketOrDie(); return ConnectServer(serverip,serverport); } void BuildNormalSocketMethod(int sockfd) { SetSockFd(sockfd); }};class TcpSocket : public Socket{public: TcpSocket(int sockfd = defalutsockfd) :_sockfd(sockfd) {} ~TcpSocket(){}; void CreateSocketOrDie() override { _sockfd = ::socket(AF_INET,SOCK_STREAM,0); if(_sockfd < 0) exit(Socket_err); } void BindSocketOrDie(uint16_t port) override { struct sockaddr_in addr; memset(&addr,0,sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = htons(port); socklen_t len = sizeof(addr); int n = ::bind(_sockfd,CONV(&addr),len); if(n < 0) exit(Bind_err); } void ListenSocketOrDie(int backlog) override { int n = ::listen(_sockfd,backlog); if(n < 0) exit(Listen_err); } Socket* AcceptConnection(std::string* clientip , uint16_t* clientport) override { struct sockaddr_in client; memset(&client,0,sizeof(client)); socklen_t len = sizeof(client); int fd = ::accept(_sockfd,CONV(&client),&len); if(fd < 0) return nullptr; char buffer[64]; inet_ntop(AF_INET,&client.sin_addr,buffer,len); *clientip = buffer; *clientport = ntohs(client.sin_port); Socket* s = new TcpSocket(fd); return s; } bool ConnectServer(const std::string& serverip , uint16_t serverport) override { struct sockaddr_in server; memset(&server,0,sizeof(server)); server.sin_family = AF_INET; // server.sin_addr.s_addr = inet_addr(serverip.c_str()); inet_pton(AF_INET,serverip.c_str(),&server.sin_addr); server.sin_port = htons(serverport); socklen_t len = sizeof(server); int n = connect(_sockfd,CONV(&server),len); if(n < 0) return false; else return true; } int GetSockFd() override { return _sockfd; } void SetSockFd(int sockfd) override { _sockfd = sockfd; } void CloseSockFd() override { if(_sockfd > defalutsockfd) { close(_sockfd); } } bool Recv(std::string& buffer , int size)override { char inbuffer[size]; int n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0); if(n > 0) { inbuffer[n] = 0; } else { return false; } buffer += inbuffer; return true; } void Send(const std::string& send_string) { send(_sockfd,send_string.c_str(),send_string.size(),0); }private: int _sockfd;};
3.4 SelectServer.hpp(服务端封装)
#pragma once#include #include #include #include \"Socket.hpp\"#include \"Log.hpp\"#include using namespace std;const static uint16_t defalutport = 8888;const static int gbacklog = 8;const static int num = sizeof(fd_set) * 8;class SelectServer{private: void HandlerEvent(fd_set &rfds) { for (int i = 0; i < num; i++) { // 是否监控 if (!_rfds_array[i]) continue; // 是否就绪 int fd = _rfds_array[i]->GetSockFd(); if (FD_ISSET(fd, &rfds)) { // 是新连接到来,还是新数据到来 // 新连接到来 if (FD_ISSET(_listensock->GetSockFd(), &rfds)) { lg.LogMessage(Info, \"get a new link\\n\"); string clientip; uint16_t cilentport; // 由于select已经检测到listensock已经就绪了,这里不会阻塞 Socket *sock = _listensock->AcceptConnection(&clientip, &cilentport); if (!sock) { lg.LogMessage(Error, \"accept error\\n\"); continue; } lg.LogMessage(Info, \"get a client , client info# %s %d , fd:%d\\n\", clientip.c_str(), cilentport, sock->GetSockFd()); // 这里已经获取连接成功,由于底层数据不一定就绪 // 所以这里需要将新连接的文件描述符交给select托管 // 只需将文件描述符加入到_rfds_array即可 int pos = 0; for (; pos < num; pos++) { if (_rfds_array[pos] == nullptr) { _rfds_array[pos] = sock; break; } } // fd_set能够存储的文件描述符是有上限的 if(pos == num) { sock->CloseSockFd(); delete sock; lg.LogMessage(Warning, \"server is full...!\\n\"); } } else { // 是新数据来了 // 这里读是有问题的 string buffer; bool flag = _rfds_array[i]->Recv(buffer,1024); if(flag) // 读取成功 { lg.LogMessage(Info,\"client say# %s\\n\",buffer.c_str()); } else // 读取失败 { lg.LogMessage(Warning,\"cilent quit !! close fd : %d\\n\",fd); _rfds_array[i]->CloseSockFd(); delete _rfds_array[i]; _rfds_array[i] = nullptr; } } } } }public: SelectServer(uint16_t port = defalutport) : _port(port), _listensock(new TcpSocket()), _isrunning(false) { } void Init() { _listensock->BuildListenSocketMethod(_port, gbacklog); for (int i = 0; i < num; i++) { _rfds_array[i] = nullptr; } _rfds_array[0] = _listensock.get(); } void Loop() { _isrunning = true; while (_isrunning) { fd_set rfds; FD_ZERO(&rfds); PrintDebug(); // 设置需要监控的读事件文件描述符集,并找到最大的文件描述符 int max_fd = _listensock->GetSockFd(); for (int i = 0; i < num; i++) { if (_rfds_array[i] == nullptr) continue; else { int fd = _rfds_array[i]->GetSockFd(); // rfds本质是一个输入输出型参数,rfds是在select调用返回的时候,不断被修改,所以,每次都要重置 FD_SET(fd, &rfds); if (max_fd < fd) { max_fd = fd; } } } struct timeval timeout = {0, 0}; ssize_t n = select(max_fd + 1, &rfds, nullptr, nullptr, &timeout); switch (n) { case -1: { lg.LogMessage(Fatal, \"select Error , last time : %u.%u\\n\", timeout.tv_sec, timeout.tv_usec); break; } case 0: { lg.LogMessage(Info, \"select timeout... , last time : %u.%u\\n\", timeout.tv_sec, timeout.tv_usec); break; } default: { lg.LogMessage(Info, \"select success , begin handler event , last time : %u.%u\\n\", timeout.tv_sec, timeout.tv_usec); HandlerEvent(rfds); break; } } } _isrunning = false; } void Stop() { _isrunning = false; } // 查看当前哪些文件描述符需要被监控 void PrintDebug() { std::cout << \"current select rfds list is : \"; for (int i = 0; i < num; i++) { if (_rfds_array[i] == nullptr) continue; else std::cout << _rfds_array[i]->GetSockFd() << \" \"; } std::cout << std::endl; } ~SelectServer() {}private: unique_ptr<Socket> _listensock; uint16_t _port; bool _isrunning; // Select服务器需要所有的fd以数据结构的方式组织起来 Socket *_rfds_array[num];};
3.5 Main.cpp(服务端)
#include #include #include \"SelectServer.hpp\"using namespace std;// ./selectserver portint main(int argc , char* argv[]){ if(argc != 2) { cout << \"Usage : \" << argv[0] << \" port\" << endl; exit(0); } uint16_t localport = stoi(argv[1]); unique_ptr<SelectServer> svr = make_unique<SelectServer>(localport); svr->Init(); svr->Loop(); return 0;}
结尾
如果有什么建议和疑问,或是有什么错误,大家可以在评论区中提出。
希望大家以后也能和我一起进步!!🌹🌹
如果这篇文章对你有用的话,希望大家给一个三连支持一下!!🌹🌹



