【仿Mudou库one thread per loop式并发服务器实现】SERVER服务器模块实现_one loop per thread
SERVER服务器模块实现
- 1. Buffer模块
- 2. Socket模块
- 3. Channel模块
- 4. Poller模块
- 5. EventLoop模块
-
- 5.1 TimerQueue模块
- 5.2 TimeWheel整合到EventLoop
- 5.1 EventLoop与线程结合
- 5.2 EventLoop线程池
- 6. Connection模块
- 7. Acceptor模块
- 8. TcpServer模块
1. Buffer模块
Buffer模块:缓冲区模块
提供的功能:存储数据,取出数据
实现思想:
- 实现缓冲区得有一块内存空间,采用vector,vector底层其实使用的就是一个线性的内存空间。不用string的原因是因为,网络传输数据什么都有而string在读取遇 ‘\\0’ 就停止了。
- 要素:
- 默认的空间大小
- 当前的读取数据位置
- 当前的写入数据位置
- 操作:
a. 写入数据:
当前写入位置指向哪里,就从哪里开始写入。
如果后续剩余空闲空间不够了,考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前边有可能会有空闲空间)。
足够:将数据移动到起始位置即可
不够:扩容,从当前写位置开始扩容足够大小
数据一旦写入成功,当前写位置,就要向后偏移
b. 读取数据:
当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
可读数据大小:当前写入位置,减去当前读取位置
// 缓存区Buffer模块#define BUFFER_SIZE 1024class Buffer{private: std::vector<char> _buffer; // 使⽤vector进⾏内存空间管理 uint64_t _read_idx; // 读偏移 uint64_t _write_idx; // 写偏移public: Buffer() : _read_idx(0), _write_idx(0), _buffer(BUFFER_SIZE) {} // 获得数组起始位置,注意并不是对象的位置 // void* Begin() char *Begin() { return &(*_buffer.begin()); } // 获取当前写⼊起始地址, _buffer的空间起始地址,加上写偏移量 // void* ReadPos() char *WritePosition() { return Begin() + _write_idx; } // 获取当前读取起始地址 // void* WritePos() char *ReadPosition() { return Begin() + _read_idx; } // 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移 uint64_t TailIdleSize() { return _buffer.size() - _write_idx; // 这里也可以用_buffer.size() - _write_idx // 因为vector构造使用的函数即开辟空间也初始化了 // 也就是说size()和capacity()是一样的大小 // 后续也用resize()会调整空间 } // 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize() { return _read_idx; } // 获取可读数据⼤⼩ = 写偏移 - 读偏移 uint64_t ReadAbleSize() { return _write_idx - _read_idx; } // 确保可写空间⾜够(整体空闲空间够了就移动数据,否则就扩容) void EnsureWriteSpace(uint64_t len) { // 如果末尾空闲空间⼤⼩⾜够,直接返回 if (TailIdleSize() >= len) return; // 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够, 够了就将数据移动到起始位置 if (HeadIdleSize() + TailIdleSize() >= len) { // 将数据移动到起始位置// 把当前数据大小先保存起来 uint64_t size = ReadAbleSize(); // 把可读数据拷贝到起始位置 std::copy(ReadPosition(), ReadPosition() + size, Begin()); // 将读偏移归0 _read_idx = 0; // 将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量 _write_idx = size; } else { // 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可 _buffer.resize(_write_idx + len); } } // 将读偏移向后移动 void MoveReadOffset(uint64_t len) { if (len == 0) return; _read_idx += len; } // 将写偏移向后移动 void MoveWriteOffset(uint64_t len) { //向后移动的大小,必须小于当前后边的空闲空间大小 assert(len <= TailIdleSize()); _write_idx += len; } // 写入数据 void Write(const void *date, uint64_t len) { // 1. 保证有足够空间,2. 拷⻉数据进去 if (len == 0) return; EnsureWriteSpace(len); // void* 不能解引用 和 ++ const char *d = (const char *)date; //copy是一个模板函数,类型需要匹配,因此上面读写位置返回都是char* std::copy(d, d + len, WritePosition()); } void WriteAndPush(const void *date, uint64_t len) { Write(date, len); MoveWriteOffset(len); } void WriteString(const std::string &data) { return Write(data.c_str(), data.size()); } void WriteStringAndPush(const std::string &data) { WriteString(data); MoveWriteOffset(data.size()); } void WriteBuffer(Buffer &data) { return Write(data.ReadPosition(), data.ReadAbleSize()); } void WriteBufferAndPush(Buffer &data) { WriteBuffer(data); MoveWriteOffset(data.ReadAbleSize()); } // 读取数据 void Read(void *buff, uint64_t len) { // 要求要获取的数据大小必须⼩于可读数据大小 assert(len <= ReadAbleSize()); std::copy(ReadPosition(), ReadPosition() + len, (char *)buff); } void ReadAndPop(void *buff, uint64_t len) { Read(buff, len); MoveReadOffset(len); } std::string ReadString(uint64_t len) { //要求要获取的数据大小必须小于可读数据大小 assert(len <= ReadAbleSize()); std::string str; str.resize(len); // str.c_str()返回的是const char* Read(&str[0], len); return str; } std::string ReadStringAndPop(uint64_t len) { assert(len <= ReadAbleSize()); std::string str = ReadString(len); MoveReadOffset(len); return str; } char *FindCRLE() { char *p = (char *)memchr(ReadPosition(), \'\\n\', ReadAbleSize()); return p; } // 通常获取⼀⾏数据,这种情况针对是 std::string GetLine() { char *p = FindCRLE(); if (p == nullptr) return \"\"; return ReadString(p - ReadPosition() + 1); //+是为了把\\n也读出来 } std::string GetLineAndPop() { std::string str = GetLine(); MoveReadOffset(str.size()); return str; } void Clear() { _read_idx = 0; _write_idx = 0; }};
这里补充一个按照日志等级打印的模块,方便后面调试。顺便把这个项目用到的所有头文件都写出来。
#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // 日志打印#define NORMAL 0#define DEBUG 1#define WARRING 2#define FATAL 3// 根据等级打印日志#define LOG(level, format, ...) \\ { \\ if (level >= FATAL) \\ { \\ time_t t = time(NULL); \\ struct tm *m = localtime(&t); \\ char ts[32] = {0}; \\ strftime(ts, 31, \"%H:%M:%S\", m); \\ fprintf(stdout, \"[%p %s %s:%d]\" format \"\\n\", (void*)pthread_self(),ts, __FILE__, __LINE__, ##__VA_ARGS__); \\ } \\ }// ... 表示可变参数,可以传0个、1个、多个// _VA_ARGS__ 将...的内容原封不动抄到__VA_ARGS__位置// ##__VA_ARGS__ ...没有参数,就移除这个_VA_ARGS_,相对于没有这个东西#define LOG_NORMAL(format, ...) LOG(NORMAL, format, ##__VA_ARGS__)#define LOG_DEBUG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__)#define LOG_WARRING(format, ...) LOG(WARRING, format, ##__VA_ARGS__)#define LOG_FATAL(format, ...) LOG(FATAL, format, ##__VA_ARGS__)
2. Socket模块
Socket模块:对socket套接字操作进行分资
功能:
- 创建套接字
- 绑定地址信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个服务端连接
- 创建一个客户端连接
- 设置套接字选项—开启地址端口重用
- 设置套接字阻塞属性-设置为非阻塞
// 套接字Socket模块#define LISTEN_SIZE 1024class Socket{private: int _sockfd;public: Socket() : _sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() { Close(); } int Fd() { return _sockfd; } // 创建套接字 bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { LOG_FATAL(\"CREATE SOCKET FAILED!!\"); return false; } return true; } // 绑定ip+port bool Bind(const uint16_t &port) { struct sockaddr_in peer; memset(&peer, 0, sizeof peer); peer.sin_family = AF_INET; peer.sin_port = htons(port); peer.sin_addr.s_addr = INADDR_ANY; // 任意地址绑定 if (bind(_sockfd, (sockaddr *)&peer, sizeof peer) < 0) { LOG_FATAL(\"BIND ADDRESS FAILED!\"); return false; } return true; } // 开始监听 bool Listen(int backlog = LISTEN_SIZE) { if (listen(_sockfd, backlog) < 0) { LOG_FATAL(\"SOCKET LISTEN FAILED!\"); return false; } return true; } // 向服务器发起连接 bool Connect(const uint16_t &port, const std::string ip) { struct sockaddr_in peer; memset(&peer, 0, sizeof peer); peer.sin_family = AF_INET; peer.sin_port = htons(port); peer.sin_addr.s_addr = inet_addr(ip.c_str()); if (connect(_sockfd, (sockaddr *)&peer, sizeof peer) < 0) { LOG_FATAL(\"CONNECT SERVER FAILED!\"); return false; } return true; } // 获取连接 int Accpet() { // 不关注是那个ip和port发起的连接 int newfd = accept(_sockfd, nullptr, nullptr); if (newfd < 0) { LOG_DEBUG(\"SOCKET ACCEPT FAILED!\"); return -1; } return newfd; } // 阻塞读取数据 ssize_t Recv(void *buffer, size_t len, int flag = 0) { ssize_t ret = recv(_sockfd, buffer, len, flag); if (ret <= 0) { // EAGAIN 当前socket的接收缓冲区中没有数据了,在⾮阻塞的情况下才会有这个错误 // EINTR 表⽰当前socket的阻塞等待,被信号打断了, if (errno == EAGAIN || errno == EINTR) { return 0; // 表⽰这次接收没有接收到数据 } LOG_FATAL(\"SOCKET RECV FAILED!\"); return -1; } return ret; // 实际接收的数据长度 } // 非阻塞读取数据 ssize_t NonBlockRecv(void *buffer, size_t len) { return Recv(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞 } // 发出数据 ssize_t Send(const void *buffer, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buffer, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EINTR) { return 0; } LOG_FATAL(\"SOCKET SEND FAILED!\"); return -1; } return ret; // 实际发送的数据长度 } // 非阻塞发出数据 ssize_t NonBlockSend(const void *buffer, size_t len) { if(len == 0) return 0; return Send(buffer, len, MSG_DONTWAIT); // MSG_DONTWAIT 表⽰当前接收为非阻塞 } // 关闭套接字 void Close() { if (_sockfd != -1) { close(_sockfd); _sockfd = -1; } } // 创建一个服务端连接 bool CreateServer(const uint16_t &port,bool block_flag = false) { // 1.创建套接字 if (Create() == false) return false; // 2.设置非阻塞 if(block_flag) NonBlock(); // 3.绑定 if (Bind(port) == false) return false; // 4.监听 if (Listen() == false) return false; // 5.开启地址端口复用 ReuseAddress(); return true; } // 创建一个客服端连接 bool CreateClient(const uint16_t &port, const std::string ip) { // 1.创建套接字 if (Create() == false) return false; // 2.发起连接 if (Connect(port, ip) == false) return false; return true; } // 设置套接字选项--开启地址端口复用 void ReuseAddress() { int opt = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt); } // 设置套接字属性 -- 非阻塞 void NonBlock() { int fl = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK); }};
3. Channel模块
Channel类设计
目的:对描述符的监控事件管理
- 成员:
因为后边使用epoll进行事件监控,
EPOLLIN 可读
EPOLLOUT 可写
EPOLLRDHUP 连接断开
EPOLLPRI 优先数据
EPOLLERR 出错了
EPOLLHUP 挂断
而以上的事件都是一个数值uint32_t进行保存,要进行事件管理,就需要有一个uint32t类型的成员保存当前需要监控的事件。
事件处理这里,因为有五种事件需要处理,就需要五个回调函数。
功能:
-
事件管理:
描述符是否可读
描述符是否可写
对描述符监控可读
对描述符监控可写
解除可读事件监控
解除可写事件监控
解除所有事件监控 -
事件触发后的处理的管理
a.需要处理的事件:可读,可写,挂断,错误,任意
b.事件处理的回调函数
// 描述符事件管理Channel模块class EventLoop;class Channel{private: int _fd; //当前需要监控的文件描述符 EventLoop *_loop; //放在自己所在的EventLoop监控 uint32_t _events; // 当前需要监控的事件 uint32_t _revents; // 当前连接触发的事件 using EventCallback = std::function<void()>; EventCallback _read_callback; // 可读事件被触发的回调函数 EventCallback _write_callback; // 可写事件被触发的回调函数 EventCallback _error_callback; // 错误事件被触发的回调函数 EventCallback _close_callback; // 连接断开事件被触发的回调函数 EventCallback _event_callback; // 任意事件被触发的回调函数public: // Channel(Poller *poller,int fd) : _fd(fd), _events(0), _revents(0), _poller(poller) {} Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {} int Fd() { return _fd; } uint32_t Events() // 获得想要监控的事情 { return _events; } void SetREvents(uint32_t events) // 设置实际就绪的事情 { _revents = events; } void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; } void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; } void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; } void SetEventCallback(const EventCallback &cb) { _event_callback = cb; } // 当前是否监控了可读 bool ReadAble() { return _events & EPOLLIN; } // 当前是否监控了可写 bool WriteAble() { return _events & EPOLLOUT; } // 启动读事件监控 void EnableRead() { _events |= EPOLLIN; Update(); } // 启动写事件监控 void EnableWrite() { _events |= EPOLLOUT; Update(); } // 关闭读事件监控 void DisableRead() { _events &= ~EPOLLIN; Update(); } // 关闭写事件监控 void DisableWrite() { _events &= ~EPOLLOUT; Update(); } // 关闭所有事件监控,文件描述符还在红黑树上,但是没有事件让它监控 void DisableAll() { _events = 0; Update(); } // 移除监控,文件描述符从红黑树中删除 void Remove(); // 添加到监控 void Update(); // 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事情如何处理由调用它的人决定 void HandleEvent() { // 读事件发现连接出现问题,是不去释放连接的,因为我们还看看是否还有数据待发送 // 等把数据都发送完了,或者发送出错了,再去关闭连接 // 写/出错/挂断 都会关闭连接,我们只需要关闭一次就好了//关于把任意事件回调放在最后,在测试阶段会有说明。 if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { // if (_event_callback) // _event_callback(); if (_read_callback) _read_callback(); } // 有可能会释放连接的操作事件,⼀次只处理⼀个 if (_revents & EPOLLOUT) { // if (_event_callback) // _event_callback(); if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { // if (_event_callback) // _event_callback(); if (_error_callback) _error_callback(); } else if (_revents & EPOLLHUP) { // if (_event_callback) // _event_callback(); if (_close_callback) _close_callback(); } if (_event_callback) { _event_callback(); } }};//这里先不做接释,等到了EventLoop在细说。void Channel::Remove(){ return _loop->RemoveEvent(this);}void Channel::Update(){ return _loop->UpdateEvent(this);}
客户端异常退出,比如断电断网,客户端底层操作系统没有办法及时服务器进行四次挥手,因此服务器也收不到任何信息,除非服务器主动去读或者写,客户端会给服务器发送一个RST,服务器读写的返回值都是-1,错误码设置为ECOMNNRESET,并且在epoll模型下的服务器会触发EPOLLERR和EPOLLHUOP事件。
客户端正常调用close或者ctrl+c终止进程,客户端操作系统有机会和服务器进行四次挥手,read返回值是0,如果发送缓存区还有数据可能首次发送回成功,第二次再次发送返回值是-1,并且会触发SIGPIPE信号,如果服务器不对这个信号做处理,就会导致服务器奔溃退出。因此我们还需要对这个信号进行特殊处理。
/*避免服务器因为给断开连接的客⼾端进⾏send触发异常导致程序崩溃,因此忽略SIGPIPE信号*//*定义静态全局是为了保证构造函数中的信号忽略处理能够在程序启动阶段就被直接执⾏*/class NetWork{ public: NetWork() { LOG_DEBUG(\"SIGPIPE INIT\"); signal(SIGPIPE,SIG_IGN); }};static NetWork nw;
4. Poller模块
Poller模块:描述符IO事件监控模块
意义:通过epoll实现对描述符的lO事件监控
功能:
- 添加/修改描述符的事件监控(不存在则添加,存在则修改)
- 移除描述符的事件监控
封装思想:
- 必须拥有一个epoll的操作句柄
- 拥有一个structepoll_event结构数组,监控时保存所有的活跃事件
- 使用hash表管理描述符与描述符对应的事件管理Channel对象
逻辑流程:
- 对描述符进行监控,通过Channel才能知道描述符需要监控什么事件
- 当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel
// 描述符I/O事件监控Poller模块#define MAX_EPOLLEVEVTS 1024class Poller{private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVEVTS]; std::unordered_map<int, Channel *> _channels;private: // 对epoll直接操作 void Update(Channel *channel, int op) { int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { LOG_WARRING(\"EPOLLCTL FAILED!\"); } return; }public: Poller() { _epfd = epoll_create(MAX_EPOLLEVEVTS); if (_epfd < 0) { LOG_FATAL(\"EPOLL CREATE FAILED!\"); abort();//退出程序 } } // 添加/修改监控事件 void UpdateEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { // 不存在则添加 _channels.insert(std::make_pair(channel->Fd(), channel)); return Update(channel, EPOLL_CTL_ADD); } // 存在则修改 return Update(channel, EPOLL_CTL_MOD); } // 移除监控 void RemoveEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } // 开始监控,返回就绪事件 void Poll(std::vector<Channel *> *active) { int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVEVTS, -1); if (nfds < 0) { if (errno == EINTR) { return; } LOG_FATAL(\"EPOLL WAIT ERROR:%s\\n\", strerror(errno)); abort();//退出程序 } for (int i = 0; i < nfds; ++i) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件 active->push_back(it->second); } return; }};
5. EventLoop模块
首先要先介绍一下eventfd,eventfd是一种事件通知机制。创建一个描述符用于实现事件通知。eventfd本质在内核里边管理的就是一个计数器。创建eventfd就会在内核中创建一个计数器(结构)。
每当向evenfd中写入一个数值—用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数。
假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,
再去read读取出来的数字就是3,读取之后计数清0。
用处:在EventLoop模块中实现线程间的事件通知功能
#include#include#include#include/*int eventfd(unsigned int initval, int flags);功能:创建一个eventfd对象,实现事件通知参数:initval:计数初值flags:EFD_CLOEXEC-禁止进程复制EFDNONBLOCK-启动非阻塞属性返回值:返回一个文件描述符用于操作eventfd也是通过read/write/close进行操作的。注意点:read&write进行IO的时候数据只能是一个8字节数据*/int main(){ int evfd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK); if(evfd < 0) { std::cout<<\"EVENTFD FAIL!\"<<std::endl; abort(); } uint64_t len = 1; int ret = write(evfd,&len,sizeof len); ret = write(evfd,&len,sizeof len); ret = write(evfd,&len,sizeof len); if(ret < 0) { std::cout<<\"WRITE FAIL!\"<<std::endl; abort(); } uint64_t val; ret = read(evfd,&val,sizeof val); if(ret < 0) { std::cout<<\"WRITE FAIL!\"<<std::endl; abort(); } std::cout<<val<<std::endl; return 0;}
EventLoop:进行事件监控,以及事件处理的模块
关键点:这个模块与线程是一一对应关联的,一个EventLoop对应一个线程。
监控了一个连接,而这个连接一旦就绪,就要进行事件处理。但是如果这个描述符,在多个EventLoop线程中都触发了事件,然后进行处理,就会存在线程安全问题。就比如说即时通信,当某个EventLoop管理的连接触发了读事件,然后要把这条消息发送给所有在线的连接,此时就会涉及到多个EventLoop线程,就有线程安全的问题。如果不想加锁怎么办,毕竟加锁解锁有一定的消耗。不加锁怎么保证多线程之间线程安全问题?
我们可以将一个EventLoop管理所有连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行!
如何保证一个连接的所有操作都在EventLoop对应的线程中?
解决方案:给EventLoop模块中,添加一个任务队列
对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中。
EventLoop处理流程:
- 在线程中对描述符进行事件监控
- 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
- 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行
这样能够保证对于连接的所有操作,都是在一个线程中进行的,不涉及线程安全问题但是对于任务队列的操作有线程安全的问题,只需要给task的操作加一把锁即可。
这里还有一个注意点:当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作:这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
- 如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
- 如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务
上面是整个项目最核心的知识点。
EventLoop模块的成员:
- 事件监控
使用Poller模块,有事件就绪则进行事件处理 - 执行任务队列中的任务
一个线程安全的任务队列
注意点:
因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞,上面eventfd在这里就派上用场了。
// EvevtLoop模块,对事件进行监控,处理class EventLoop{private: using Functor = std::function<void()>; std::thread::id _thread_id; // EvevtLoop对应的线程ID int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞 Channel _event_channel; // 对_event_fd做事件管理 Poller _poller; // 对所有事件进行监控 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全public: // 执行任务池中所有任务 void RunAllTask() { // 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来 // 这也是不用队列做任务池的原因 std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } // 依次执行任务 for (auto &f : functor) { f(); } } static int CreateEventFd() { int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK); if (efd < 0) { LOG_FATAL(\"CREATE EVENTFD FAILED!!\"); abort(); } return efd; } void ReadEvent() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof res); if (ret <= 0) { if (errno == EINTR || errno == EWOULDBLOCK) { return; } LOG_FATAL(\"READ EVENTFD FAILED!!\"); abort(); } } void WeakUpEventfd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof val); if (ret < 0) { if (errno == EINTR) { return; } LOG_FATAL(\"WRITE EVENTFD FAILED!!\"); abort(); } }public: EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this) { // 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数 _event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this)); // 启动eventfd读事件监控 _event_channel.EnableRead(); //std::cout<<\"_event_fd: \"<<_event_fd<<std::endl; }; // 1.事件监控 2.就绪事件处理 3.执行任务 void Statr() { while(1) { // 1.事件监控 std::vector<Channel *> active; _poller.Poll(&active); // 2.就绪事件处理 for (auto &channel : active) { channel->HandleEvent(); } // 3.执行任务 RunAllTask(); } } // 判断当前线程是否是EventLoop对应的线程 bool IsInLoop() { return _thread_id == std::this_thread::get_id(); } // 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列 void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } // 将操作压入任务池 void QueueInLoop(const Functor &cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞 // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件 WeakUpEventfd(); } // 添加/修改描述符的事件监控 void UpdateEvent(Channel *channel) { _poller.UpdateEvent(channel); } // 移除描述符监控 void RemoveEvent(Channel *channel) { _poller.RemoveEvent(channel); }};
5.1 TimerQueue模块
一个EventLoop模块里面也有一个TimerQueue对象用于定时任务的管理,里面放着每个连接的非活跃超时销毁的任务。这里有个问题,如何让整个程序启动之后,时间轮一秒走一步呢?
可以将时间轮和Linux下的定时器time_create和time_settime整合在一起。
定时器模块的整合:
timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务
要实现一个完整的秒级定时器,就需要将这两个功能整合到一起
timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务。
// 定时器模块using TaskFunc = std::function<void()>;using ReleaseFunc = std::function<void()>;class TimerTask{private: uint64_t _id; // 定时器任务对象ID uint32_t _timeout; // 定时任务的超时时间 bool _cancel; // 定时任务是否取消,false不取消,true取消 TaskFunc _task_cb; // 定时器对象要执行的定时任务 ReleaseFunc _release_cb; /// 用于删除TimerWheel中保存的定时器对象信息 public: TimerTask(uint64_t id, uint32_t time,const TaskFunc& cb) : _id(id), _timeout(time), _task_cb(cb), _cancel(false) {} ~TimerTask() { if (_cancel == false) { _task_cb(); } _release_cb(); } uint32_t DelayTime() { return _timeout; } void SetRelease(const ReleaseFunc &rb) { _release_cb = rb; } void Cancel() { _cancel = true; }};class TimeWheel{private: using WeakPtr = std::weak_ptr<TimerTask>; using SharedPtr = std::shared_ptr<TimerTask>; int _tick; // 滴答指针,当前的秒,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务 int _capacity; // 表盘最大数量---其实就是最大延迟时间 std::vector<std::vector<SharedPtr>> _wheel; // 时间轮 std::unordered_map<uint64_t, WeakPtr> _timers; // 通过id找到任务对象,方便后续刷新任务 // unordered_map第二个参数不能用shared_ptr,如果还用用shared_ptr去指向原始对象,那么就和之前用shared_ptr指向原始对象 // 而产生的引用计数根本就不是同一个引用计数,这样如果unordered_map第二个参数引用计数是1,而其他指向它引用计数是2 // 万一unordered_map第二个参数shared_ptr被释放了,这个原始对象就会被释放,这是由问题的. // 如何保证定时器的滴答指针,在程序启动之后,每秒走一次 // 与linux下得到定时器time_create和time_settime结合一起用 // time_settime超时时间间隔设置为1秒,每秒都会给timefd写超时次数 // 用EventLoop把timefd管理起来,启动读事件监控,一写读就绪,然后就让滴答指针往后走一步 EventLoop *_loop; int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务 Channel _timerfd_channel; // // 释放时间轮上任务对象 void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; } _timers.erase(it); } static int CreateTimerFd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if (timerfd < 0) { LOG_FATAL(\"CREATE TIMEFD FAILED!!\"); abort(); } struct itimerspec item; item.it_value.tv_sec = 1; item.it_value.tv_nsec = 0; // 第一次超时时间 item.it_interval.tv_sec = 1; item.it_interval.tv_nsec = 0; // 第⼀次之后的超时间隔时间 timerfd_settime(timerfd, 0, &item, nullptr); return timerfd; } int ReadTimeFd() { //有可能因为其他描述符的事件处理花费事件⽐较⻓,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次 //read读取到的数据times就是从上⼀次read之后超时的次数 uint64_t times = 0; int ret = read(_timerfd, ×, 8); if (ret < 0) { LOG_FATAL(\"READ TIMERFD FAILED!!\"); abort(); } //std::cout<<\"cnt:\"<<times<<std::endl; return times; } // 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉 _wheel[_tick].clear(); // 执行任务,释放shared_ptr指针,当引用计数到0,调用Task析构,执行定时任务 } void OneTime() { //根据实际超时的次数,执⾏对应的超时任务 int times = ReadTimeFd(); for (int i = 0; i < times; ++i) { RunTimerTask(); } } // 设置定时任务 void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { SharedPtr spt(new TimerTask(id, delay, cb)); spt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(spt); _timers[id] = WeakPtr(spt); } // 刷新/延迟定时任务 void TimerRefreshInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return;//没找着定时任务,没法刷新,没法延迟 } // spt是临时对象,出栈后会自动销毁,不会增加引用计数 SharedPtr spt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr int delaytime = spt->DelayTime(); int pos = (_tick + delaytime) % _capacity; _wheel[pos].push_back(spt); } // 取消定时任务,等时间到了,释放任务对象,但是析构里面的超时任务不去执行 void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return;//没找着定时任务,没法刷新,没法延迟 } // spt是临时对象,出栈后会自动销毁,不会增加引用计数 SharedPtr spt = it->second.lock(); //如果没有if判断,超时先去执行的释放任务,然后才删除unordered_map管理的对象, //释放任务里面有如果有定时任务就去取消这一步,因为对象还没有删除,所有会进来这个函数 //但TimerTask对象正在析构的时候,没有if判断 就去执行spt->Cancel()程序就会奔溃 if(spt) spt->Cancel(); }public: TimeWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerFd()), _timerfd_channel(_loop, _timerfd) { // timerfd读事件就绪回调函数 _timerfd_channel.SetReadCallback(std::bind(&TimeWheel::OneTime, this)); _timerfd_channel.EnableRead(); // 启动timerfd读事件监控 } ~TimeWheel() {} // 定时器中有个_timers成员,定时器信息的操作有可能在多个EventLoop线程中进行,因此需要考虑线程安全的问题 // 但是我们又不想加锁,那就把对定时器的所有操作,都放在对应的线程中进行 // 一个线程都是串行化执行的,那就不存在线程安全问题了 void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb); void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); // 虽然这个接口存在线程安全问题,这个接口实际上不能被外界使用者调用,只能在模块EventLoop线程内执行 bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }};
5.2 TimeWheel整合到EventLoop
// EvevtLoop模块,对事件进行监控,处理class EventLoop{private: using Functor = std::function<void()>; std::thread::id _thread_id; // 线程ID int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞 Channel _event_channel; // 对_event_fd做事件管理 Poller _poller; // 对所有事件进行监控 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全 TimeWheel _time_wheel; // 定时器模块public: // 执行任务池中所有任务 void RunAllTask() { // 为了不让每次拿任务都加锁解锁,因此一次把任务都拿出来 // 这也是不用队列做任务池的原因 std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } // 依次执行任务 for (auto &f : functor) { f(); } } static int CreateEventFd() { int efd = eventfd(0, O_CLOEXEC | O_NONBLOCK); if (efd < 0) { LOG_FATAL(\"CREATE EVENTFD FAILED!!\"); abort(); } return efd; } void ReadEvent() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof res); if (ret <= 0) { if (errno == EINTR || errno == EWOULDBLOCK) { return; } LOG_FATAL(\"READ EVENTFD FAILED!!\"); abort(); } } void WeakUpEventfd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof val); if (ret < 0) { if (errno == EINTR) { return; } LOG_FATAL(\"WRITE EVENTFD FAILED!!\"); abort(); } }public: EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(this, _event_fd), _time_wheel(this) { // 给eventfd添加可读事件的回调,就绪了,读取eventfd事件通知次数 _event_channel.SetReadCallback(std::bind(&EventLoop::ReadEvent, this)); // 启动eventfd读事件监控 _event_channel.EnableRead(); //std::cout<<\"_event_fd: \"<<_event_fd<<std::endl; }; // 1.事件监控 2.就绪事件处理 3.执行任务 void Statr() { while(1) { // 1.事件监控 std::vector<Channel *> active; _poller.Poll(&active); // 2.就绪事件处理 for (auto &channel : active) { channel->HandleEvent(); } // 3.执行任务 RunAllTask(); } } // 判断当前线程是否是EventLoop对应的线程 bool IsInLoop() { return _thread_id == std::this_thread::get_id(); } // 判断将要执行的任务是否处于当前线程,如果是则执行,不是则压入队列 void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } // 将操作压入任务池 void QueueInLoop(const Functor &cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞 // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件 WeakUpEventfd(); } // 添加/修改描述符的事件监控 void UpdateEvent(Channel *channel) { _poller.UpdateEvent(channel); } // 移除描述符监控 void RemoveEvent(Channel *channel) { _poller.RemoveEvent(channel); } // 添加定时任务 void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _time_wheel.TimerAdd(id, delay, cb); } // 刷新定时任务 void TimerRefresh(uint64_t id) { return _time_wheel.TimerRefresh(id); } // 删除定时任务 void TimerCancel(uint64_t id) { return _time_wheel.TimerCancel(id); } // 是否有某个定时任务存在 bool HasTimer(uint64_t id) { return _time_wheel.HasTimer(id); }};//Chaneel模块之前只有EventLoop的声明,没有内部函数。因此需要放在EventLoop后面,编译才不会报错void Channel::Remove(){ return _loop->RemoveEvent(this);}void Channel::Update(){ return _loop->UpdateEvent(this);}//同理TimeWheel模块也是一样void TimeWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){ _loop->RunInLoop(std::bind(&TimeWheel::TimerAddInLoop, this, id, delay, cb));}void TimeWheel::TimerRefresh(uint64_t id){ _loop->RunInLoop(std::bind(&TimeWheel::TimerRefreshInLoop, this, id));}void TimeWheel::TimerCancel(uint64_t id){ _loop->RunInLoop(std::bind(&TimeWheel::TimerCancelInLoop, this, id));}
5.1 EventLoop与线程结合
目标:将EventLoop模块与线程整合起来
EventLoop模块与线程是——对应的。
EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,
而后边当运行一个连接操作的时候判断当前是否运行在连接自己的所在EventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是自己这个连接所在EventLoop线程。
上面的含义:EventLoop模块在实例化对象的时候,必须在线程内部
EventLoop实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置。
存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的。因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象
造一个新的模块:LoopThread
这个模块的功能:将EventLoop与thread整合到一起
思想:
- 创建线程
- 在线程中实例化EventLoop对象
功能:可以向外部返回所实例化的EventLoop
//一个EventLoop一个线程,将EventLoop和线程整合在一起class LoopThread{ private: //互斥锁+条件变量,用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop std::mutex _mutex;//互斥锁 std::condition_variable _cond;//条件变量 EventLoop* _loop;//EventLoop指针变量,这个对象需要在线程内部实例化 std::thread _thread;//EventLoop对应的线程 private: //实例化EventLoop对象,唤醒_cond有可能阻塞的线程,并且开始运行EventLoop模块的功能 void ThreadEntry() { //由LoopThread管理这个变量的生命周期 EventLoop loop; { std::unique_lock<std::mutex> _mtx(_mutex); _loop = &loop; _cond.notify_all(); } loop.Statr(); } public: //创建线程,设定线程入口函数 LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry,this)){} //返回当前线程关联的EventLoop对象的指针,得到EventLoop指针相对于找到这个线程,把连接绑定到对应EventLoop线程中 EventLoop* GetLoop() { EventLoop* loop = nullptr; { std::unique_lock<std::mutex> _mtx(_mutex); _cond.wait(_mtx,[&](){ return _loop != nullptr;});//_loop为nullptr就一直阻塞 loop = _loop; } return loop; }};
5.2 EventLoop线程池
针对LoopThread设计一个线程池:
LoopThreadPool模块:对所有的LoopThread进行管理及分配
功能:
- 线程数量可配置(0个或多个)
注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理
因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理 - 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
- 提供线程分配的功能
当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
//针对LoopThread做一个线程池class LoopThreadPoll{ private: int _thread_count;//从属线程的数量 int _next_loop_idx;//给连接分配那个EventLoop EventLoop* _base_loop;//主EventLoop,运行在主线程,从属EventLoop线程数量为0,则所有操作都在主EventLoop中进行 std::vector<LoopThread*> _threads;//保存所有LoopThread对象 std::vector<EventLoop*> _loops;//从属EventLoop线程数量大于0则从_loops进行线程EventLoop分配 public: LoopThreadPoll(EventLoop* loop):_thread_count(0),_next_loop_idx(0),_base_loop(loop){}; //设置从属EventLoop线程数量 void SetThreadCount(int count){ _thread_count = count; } //创建从属线程 void Create() { if(_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for(int i = 0; i < _thread_count; ++i) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } } //获取从属线程EvetnLoop指针 EventLoop* NextLoop() { if(_thread_count == 0) { return _base_loop; } //RR轮转 _next_loop_idx = (_next_loop_idx + 1) % _thread_count; return _loops[_next_loop_idx]; }};
6. Connection模块
Connection模块:
目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成。
管理:
- 套接字的管理,能够进行套接字的操作
- 连接事件的管理,可读,可写,错误,挂断,任意
- 缓冲区的管理,便于socket数据的接收和发送
- 协议上下文的管理,记录请求数据的处理过程
- 回调函数的管理
a. 因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
b. 一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
c. 一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
d. 任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数
功能:
- 发送数据—给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
- 关闭连接—给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
- 启动非活跃连接的超时销毁功能
- 取消非活跃连接的超时销毁功能
- 协议切换—一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
// 通用类型Anyclass Any{ class holder { public: virtual ~holder() {} virtual const std::type_info &type() = 0; virtual holder *clone() = 0; }; template <class T> class placeholder : public holder { public: placeholder(const T &val) : _val(val) {} // 获取子类对象保存的数据类型 virtual const std::type_info &type() { return typeid(T); } // 针对当前的对象自身,克隆出一个新的子类对象 virtual holder *clone() { return new placeholder(_val); } public: T _val; };private: holder *_content;public: Any() : _content(nullptr) {} ~Any() { delete _content; } template <class T> Any(const T &val) { _content = new placeholder<T>(val); } Any(const Any &other) { _content = _content != nullptr ? other._content->clone() : nullptr; } // 返回子类对象保存的数据的指针 template <class T> T *Get() { assert(typeid(T) == _content->type()); // 父类指针指向子类中属于父类的,找不到子类中成员,因此需要转成子类指针 return &(((placeholder<T> *)_content)->_val); } Any &swap(Any &other) { std::swap(_content, other._content); return *this; } // 赋值运算符的重载函数 template <class T> Any &operator=(const T &val) { Any(val).swap(*this); return *this; } Any &operator=(const Any &other) { Any(other).swap(*this); return *this; }};class Connection;// DISCONNECTED 连接关闭状态 CCONNECTING 连接建立成功成功,待处理状态// CONNECTED 连接建立完成,各种设置已完成,可以通信状态 DISCONNECTING 连接待关闭状态typedef enum{ DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING} ConStatus;//使用shared_ptr管理每个Connection对象,防止用户把Connection在某个地方删除//最终导致程序奔溃退出using PtrConnection = std::shared_ptr<Connection>;//enable_shared_from_this里面保存了一个weak_ptr,可以从weak_ptr得到一个shared_ptrclass Connection: public std::enable_shared_from_this<Connection>{private: uint64_t _conn_id; // 连接唯一ID,便于连接的管理和查找 // uint64_t _timer_fd; // 定时器ID,必须是唯一,这块为了简化操作使用_conn_id作为定时器id int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false EventLoop *_loop; // 连接所关联的一个EventLoop ConStatus _status; // 连接状态 Socket _socket; // 套接字管理 Channel _channel; // 连接事件管理 Buffer _in_buffer; // 输入缓存区,存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓存区,存放经过业务处理要给对端发出的数据 Any _context; // 请求接收处理上下文 // 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的) // 换句话说,这⼏个回调都是组件使用者使用的 //连接建立回调 using ConnectedCallback = std::function<void(const PtrConnection &)>; //接收到数据回调 using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; //关闭连接回调 using ClosedCallback = std::function<void(const PtrConnection &)>; //任意事件回调 using AnyeEventCallback = std::function<void(const PtrConnection &)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyeEventCallback _event_callback; //组件内的连接关闭回调--组件内设置的,因为服务器组件内也会把所有的连接管理起来,⼀旦某个连接要关闭 //就应该从管理的地⽅移除掉自己的信息 ClosedCallback _server_closed_callback;private: //五个channel的事件回调函数 //描述符可读事件触发后调⽤的函数,接收socket数据放到接收缓冲区中,然后调⽤_message_callback进行业务处理 void HandleRead() { //1.接收socket缓存区数据,放到缓存区 char buffer[65535]; ssize_t ret = _socket.NonBlockRecv(buffer,65535); if(ret < 0) { //出错,不能直接关闭连接 return ShutdownInLoop(); } //这里的ret等于0表示的是没有读取到数据,⽽并不是连接断开了,连接断开返回的是-1 //将数据放⼊输⼊缓冲区,写⼊之后顺便将写偏移向后移动 _in_buffer.WriteAndPush(buffer,ret); //2.调用_message_callback进行业务处理 if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback){ //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 return _message_callback(shared_from_this(),&_in_buffer); } } } //描述符可写事件出发后调用的函数,将发送缓存区中数据进行发送 void HandleWrite() { //_out_buffer中保存的是待发送的数据 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize()); if(ret < 0) { //发送错误关闭连接,如果接收缓存区还有数据先处理一下 if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback){ //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 _message_callback(shared_from_this(),&_in_buffer); } } //这时候就是实际的关闭释放操作了 //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务 //return ReleaseInLoop(); return Release(); } //千万不要忘了,将读偏移向后移动 _out_buffer.MoveReadOffset(ret); if(_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite();// 没有数据待发送了,关闭写事件监控 if(_status == DISCONNECTING)//如果当前是连接待关闭状态,还有数据,发送完数据释放连接,没有数据则直接释放 { //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务 //return ReleaseInLoop(); return Release(); } } return; } //描述符触发挂断事件 void HandleClose() { //⼀旦连接挂断了,套接字就什么都⼲不了了,因此有数据待处理就处理⼀下,完毕关闭连接 if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback){ //shared_from_this--从当前对象⾃⾝获取⾃⾝的shared_ptr管理对象 _message_callback(shared_from_this(),&_in_buffer); } } //这时候就是实际的关闭释放操作了 //先把释放连接任务压到任务队列中,等所有就绪事件处理后,在处理任务队列中释放连接任务 //return ReleaseInLoop(); return Release(); } //描述符触发出错事件 void HandleError() { return HandleClose(); } //描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调⽤组件使用者的任意事件回调 void HandleEvent() { if(_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if(_event_callback) { _event_callback(shared_from_this()); } } //连接获取后,所处的状态下要进行各种设置(启动读事件监控,调用连接建立阶段回调函数) void EstablishedInLoop() { // 1. 修改连接状态 assert(_status == CONNECTING);//当前的状态必须⼀定是上层的半连接状态 _status = CONNECTED;//当前函数执⾏完毕,则连接进⼊已完成连接状态 // 2. 启动读事件监控 _channel.EnableRead();// ⼀旦启动读事件监控就有可能会⽴即触发读事件,如果这时候启动了⾮活跃连接销毁 //3. 调用组件调用者设置的连接建立阶段的回调函数 if(_connected_callback){ _connected_callback(shared_from_this()); } } //这个接口才是实际的释放接口 void ReleaseInLoop() { //1. 修改连接状态,将其置为DISCONNECTED _status == DISCONNECTED; //2. 移除连接的事件监控 _channel.Remove(); //3. 关闭描述符 _socket.Close(); //4. 如果当前定时器队列中还有定时销毁任务,则取消任务 if(_loop->HasTimer(_conn_id)) { CancelInactiveReleaseInLoop(); } //5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放, // 再去处理会出错,因此先调用用户的回调函数 if(_closed_callback){ _closed_callback(shared_from_this()); } //移除服务器内部管理的连接信息 if(_server_closed_callback){ _server_closed_callback(shared_from_this()); } } //这个接⼝并不是实际的发送接⼝,⽽只是把数据放到了发送缓冲区,启动了可写事件监控 void SendInLoop(Buffer& buf) { if(_status == DISCONNECTED) return; _out_buffer.WriteBufferAndPush(buf); if(_channel.WriteAble() == false) { _channel.EnableWrite(); } } //这个关闭操作并⾮实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { // 设置连接为半关闭状态 _status = DISCONNECTING; if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback){ _message_callback(shared_from_this(),&_in_buffer); } } //要么就是写⼊数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if(_out_buffer.ReadAbleSize() > 0) { if(_channel.WriteAble() == false) { _channel.EnableWrite(); } } if(_out_buffer.ReadAbleSize() == 0) { //ReleaseInLoop(); Release(); } } //启动⾮活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { //1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; //2. 如果当前定时销毁任务已经存在,那就刷新延迟⼀下即可 if(_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } //3. 如果不存在定时销毁任务,则新增 /* 业务处理超时,查看服务器的处理情况 当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间) 1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放 假设现在 12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度 1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度 2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉 这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误) 因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务队列中, 等到事件处理完了执行任务池中的任务的时候,再去释放*/ //_loop->TimerAdd(_conn_id,sec,std::bind(&Connection::ReleaseInLoop,this)); _loop->TimerAdd(_conn_id,sec,std::bind(&Connection::Release,this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if(_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any& context,const ConnectedCallback& conn,const MessageCallback& msg, const ClosedCallback& closed,const AnyeEventCallback& event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; }public: Connection(EventLoop* loop,uint64_t conn_id,int sockfd) :_loop(loop) ,_conn_id(conn_id) ,_sockfd(sockfd) ,_enable_inactive_release(false) ,_status(CONNECTING) ,_socket(_sockfd) ,_channel(_loop,_sockfd) { _channel.SetReadCallback(std::bind(&Connection::HandleRead,this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite,this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError,this)); _channel.SetCloseCallback(std::bind(&Connection::HandleClose,this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent,this)); } ~Connection() { LOG_DEBUG(\"RELEASE CONNECTION:%p\", this); } //获得管理的文件描述符 int Fd() { return _sockfd; } //获取连接ID int Id() { return _conn_id; } //当前是否处于CONNECTED状态 bool Connected() { return (_status == CONNECTED); } //设置上下文--连接建立完成时进行调用 void SetContent(const Any& context) { _context = context; } //获取上下文 Any* GetContent() { return &_context; } void SetConnectedCallback(const ConnectedCallback& cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback& cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback& cb) { _closed_callback = cb; } void SetAnyeEventCallback(const AnyeEventCallback& cb) { _event_callback = cb; } void SetSevClosedCallback(const ClosedCallback& cb) { _server_closed_callback = cb; } //多线程执行,涉及到线程安全的问题,我们都放在EventLoop线程下的任务队列中执行 //连接建立就绪后,启动读监控,调用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this)); } //发送数据,实际是将数据放到发送缓存区,启动写事件监控 void Send(const void* data,size_t len) { //外界传⼊的data,可能是个临时的空间,我们现在只是把发送操作压⼊了任务池,有可能并没有被⽴即执⾏ //因此有可能执⾏的时候,data指向的空间有可能已经被释放了 Buffer buf; buf.WriteAndPush(data,len); //std::move(buf))是将右值参数(如 std::move(buf))会被移动构造到 std::bind 内部,生成一个 独立存储的 Buffer 对象。 //当异步任务执行时,std::bind 会将存储的 Buffer 对象以 \"左值\" 形式传递给 SendInLoop //所以void SendInLoop(Buffer& buf),Buffer应为左值引用 //并且这里std::move(buf))想要减少一次拷贝,减少的是将buf拷贝到bind内部生成Buffer对象这次拷贝 _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,std::move(buf))); } //提供给组件使用者的关闭窗口,并不是真实关闭,需要判断输入输出有没有数据待处理 void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop,this)); } void Release() { //定时超时释放连接的任务,压到任务队列中 //甚至可以所有的释放连接的地方都先先把任务压到任务队列中,等所有就绪事件处理自后再去处理任务队列中的任务 //这样可以把Channel类中HandleEvent中改改,不用每个事件执行之前都先去执行任意事件的回调,而是最后在执行任意事件回调 //这样更符合逻辑,当事件就绪处理自后在刷新活跃度,如果超时在释放 //现在不会在处理就绪事件的时候去释放连接了,所有不用担心因为释放连接销毁Connection对象而导致调用任意事件回调而导致程序奔溃了 _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop,this)); } //启动非活跃消费,并定义多长时间五通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,sec)); } //取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop,this)); } //切换协议---重置上下⽂以及阶段性回调处理函数 -- ⽽是这个接⼝必须在EventLoop线程中⽴即执⾏ //防备新的事件触发后,处理的时候,切换任务还没有被执⾏--会导致数据使⽤原协议处理了。 void Upgrade(const Any& content,const ConnectedCallback& conn,const MessageCallback& msg, const ClosedCallback& closed,const AnyeEventCallback& event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,content,conn,msg,closed,event)); }};
7. Acceptor模块
Acceptor模块:对监听套接字进行管理
- 创建一个监听套接字
- 启动读事件监控
- 事件触发后,获取新连接
- 调用新连接获取成功后的回调函数
- 为新连接创建Connection进行管理(这一步不是Acceptor模块操作,应该是服务器模块)
因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心
对于新连接如何处理,应该是服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数
//Acceptor模块,对监听套接字进行管理class Acceptor{ private: EventLoop* _loop;//对监听套接字进行监控 Socket _socket;//创建监听套接字 Channel _channel;//对监听套接字进行事件管理 using AcceptCallback = std::function<void(int)>; AcceptCallback _accept_callback; private: //监听套接字读事件就绪回调函数 --- 获取新连接,调用_accept_callback函数进行新连接处理 void HandleRead() { int newfd = _socket.Accpet(); if(newfd < 0) { return; } if(_accept_callback) _accept_callback(newfd); } int CreateServer(uint16_t port) { int ret = _socket.CreateServer(port); assert(ret == true); return _socket.Fd(); } public: Acceptor(EventLoop* loop,uint16_t port):_loop(loop),_socket(CreateServer(port)),_channel(_loop,_socket.Fd()) { _channel.SetReadCallback(std::bind(&Acceptor::HandleRead,this)); } void SetAcceptCallback(const AcceptCallback& cb) { _accept_callback = cb; } //不能将启动读事件监控,放到构造函数,必须设置在回调函数后,再去启动 //否则有可能造成启动监控之后,立刻就有了事件,处理的时候,回调函数还没有设置,新连接得不到处理,且资源泄漏 void Listen() { _channel.EnableRead(); } };
8. TcpServer模块
TcpServer模块:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建
管理:
- Acceptor对象,创建一个监听套接字
- EventLoop对象,baseloop对象,实现对监听套接字的事件监控
- std:unordered_map_conns,实现对所有新建连接的管理
- LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理
功能:
- 设置从属线程池数量
- 启动服务器
- 设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接
- 是否启动非活跃连接超时销毁功能
- 给baseloop添加定时任务功能(如果用户需要的话可以设置)
流程:
- 在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)
- 将Acceptor挂到baseloop上进行事件监控
- 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接
- 对新连接,创建一个Connection进行管理
- 对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)
- 启动Connection的非活跃连接的超时销毁规则
- 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控
- 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调
class TcpServer{ private: uint64_t _next_id; //这是一个自动增长的连接ID uint16_t _port;//服务器端口 int _timeout;//这是非活跃连接的统计时间---多长时间无通信就是非活跃连接 bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志 EventLoop _baseloop;//这是主线程的EventLoop对象,负责监听事件的处理 Acceptor _acceptor;//这是监听套接字的管理对象 LoopThreadPoll _pool;//这是从属EventLoop线程池 std::unordered_map<uint64_t,PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象 using Functor = std::function<void()>; //连接建立回调 using ConnectedCallback = std::function<void(const PtrConnection &)>; //接收到数据回调 using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; //关闭连接回调 using ClosedCallback = std::function<void(const PtrConnection &)>; //任意事件回调 using AnyeEventCallback = std::function<void(const PtrConnection &)>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyeEventCallback _event_callback; private: //为新连接构造一个Connection进行管理 void NewConnection(int fd) { LOG_DEBUG(\"NEW CONNECTION\"); _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(),_next_id,fd)); conn->SetConnectedCallback(_connected_callback); conn->SetClosedCallback(_closed_callback); conn->SetMessageCallback(_message_callback); conn->SetAnyeEventCallback(_event_callback); conn->SetSevClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1)); if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃销毁 conn->Established();//就绪初始化 _conns.insert(std::make_pair(_next_id,conn)); } void RemoveConnectionInLoop(const PtrConnection& conn) { //int id = conn->Id(); auto it = _conns.find(conn->Id()); if(it != _conns.end()) { _conns.erase(it); } } //给Connection模块的_server_closed_callback设置一个回调函数 //从管理Connection的_conns中移除管理信息 //这里才是真正释放Connection的地方 void RemoveConnection(const PtrConnection& conn) { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn)); } void RunAfterInLoop(const Functor& task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id,delay,task); } public: TcpServer(uint16_t port) :_port(port) ,_next_id(0) ,_timeout(0) ,_enable_inactive_release(false) ,_acceptor(&_baseloop,_port) ,_pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1)); _acceptor.Listen(); } //设置从属EventLoop线程数量 void SetThreadCount(int count) { _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback& cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback& cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback& cb) { _closed_callback = cb; } void SetAnyeEventCallback(const AnyeEventCallback& cb) { _event_callback = cb; } //启动非活跃定时销毁任务 void EnableInactiveRelease(int timeout) { _enable_inactive_release = true; _timeout = timeout; } //提供给主线程添加定时任务的接口,是否调用由用户决定 void RunAfter(const Functor& task, int delay) { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay)); } //启动服务器 void Start() { //创建从属EventLoop线程池 _pool.Create(); //启动监听套接字读事件监控 _baseloop.Statr(); } };