【Linux】高性能网络模式:Reactor 反应堆模式
🌈 个人主页:Zfox_
🔥 系列专栏:Linux
目录
- 一 :🔥 Reactor 模式
-
- 🦋 Reactor 模式的核心思想
- 🦋 Reactor 模式的组成
- 二 :🔥案例使用 -- 基于`Reactor`的计算器
-
- 🦋 封装 epoll
- 🦋 封装 Connection
- 🦋 封装 Reactor
- 🦋 封装 Listener(专门负责获取连接的模块)
- 🦋 封装 IOService(只负责IO的)
- 🦋 Main.cc
- 三 :🔥 复盘分析
-
- 🦋 多路转接对写的处理
- 🦋 最佳实践 `One Thread One loop`
- 四 :🔥 共勉
一 :🔥 Reactor 模式
🐳 Reactor 模式 \\colorbox{cyan}{ 🐳 Reactor 模式 } 🐳 Reactor 模式 是一种事件处理设计模式,用于处理多个并发输入事件
。它通过事件驱动的方式,将事件分发给相应的处理程序,从而实现对并发事件的高效处理。Reactor
模式广泛应用于网络编程、服务器框架等地方,例如 Java 的 NIO、Netty 框架,以及 C++ 的 Boost.Asio 等。
🦋 Reactor 模式的核心思想
Reactor
模式的核心思想是:
- 事件驱动:通过事件循环(
Event Loop
)监听多个事件源(多个文件描述符)(如 Socket、文件描述符等),该事件驱动器可以采用select
,poll
,epoll
等。 - 事件分发:当事件发生时,
Reactor
将事件分发给对应的事件处理器(Event Handler
)。 - 非阻塞:
Reactor
模式通常与 非阻塞 I/O 结合使用,避免线程阻塞,所以需要将 Socket、文件描述符等通过fcntl
函数设置为非阻塞状态。
🦋 Reactor 模式的组成
Reactor
模式通常由以下几个组件组成:
Loop
(反应器 / 事件循环)
- 负责监听事件源(多个文件描述符)(如 Socket、文件描述符等)。
- 当事件发生时,将事件分发给对应的事件处理器。
Dispatcher
(事件多路分发器)
- 当事件发生时,通知
Event Handler
进行处理。
-
Event Handler
(事件处理器)
- 定义处理事件的接口。
- 每个事件源对应一个事件处理器。
二 :🔥案例使用 – 基于Reactor
的计算器
🦋 封装 epoll
#pragma once#include #include #include \"Log.hpp\"#include \"Common.hpp\"using namespace LogModule;namespace EpollMoudle{ class Epoller { public: Epoller() : _epfd(-1) {} void Init() { _epfd = epoll_create(256); if(_epfd < 0) { LOG(LogLevel::ERROR) << \"epoll_create error\"; exit(EPOLL_CREATE_ERR); } LOG(LogLevel::INFO) << \"epoll_create success, epfd: \" << _epfd; } int Wait(struct epoll_event revs[], int num, int timeout) // 输出就绪的fd 和 events { int n = epoll_wait(_epfd, revs, num, timeout); if(n < 0) { LOG(LogLevel::WARNING) << \"epoll_wait error\"; } return n; } void Ctrl(int sockfd, uint32_t events, int flag) { struct epoll_event ev; ev.events = events; ev.data.fd = sockfd; int n = epoll_ctl(_epfd, flag, sockfd, &ev); if (n < 0) { LOG(LogLevel::WARNING) << \"epoll_ctl error\"; } } void Add(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_ADD); } void Update(int sockfd, uint32_t events) { Ctrl(sockfd, events, EPOLL_CTL_MOD); } void Delete(int sockfd) { int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, nullptr); if (n < 0) { LOG(LogLevel::WARNING) << \"epoll_ctl error\"; } } ~Epoller() {} private: int _epfd; };}
🦋 封装 Connection
所有的连接里都包含回指指针 指向自己的 Reactor 模型 \\colorbox{pink}{ 所有的连接里都包含回指指针 指向自己的 Reactor 模型} 所有的连接里都包含回指指针 指向自己的 Reactor 模型
#pragma once#include #include #include #include #include \"InetAddr.hpp\"class Reactor; // 循环依赖的问题// 普通fd,Listensockfd// 让对 fd 的处理方式采用同一种方式// 描述一个连接class Connection{public: Connection() : _sockfd(-1), _events(0) { // 自动获得当前系统的时间戳 } void UpdateTime() { // 更新时间, 重新获取时间 } void SetPeerInfo(const InetAddr &peer_addr) { _peer_addr = peer_addr; } void SetSockfd(int sockfd) { _sockfd = sockfd; } int Sockfd() { return _sockfd; } void SetEvents(uint32_t events) { _events = events; } uint32_t GetEvents() { return _events; } void SetOwner(Reactor *owner) { _owner = owner; } Reactor* GetOwner() { return _owner; } void Append(const std::string &in) // 把收到的数据添加到自己的接受缓冲区 { _inbuffer += in; } void AppendToOut(const std::string &out) { _outbuffer += out; } void DiscardOutString(int n) { _outbuffer.erase(0, n); } bool isOutBufferEmpty() { return _outbuffer.empty(); } std::string &OutString() { return _outbuffer; } std::string &InBuffer() // 故意 { return _inbuffer; } void Close() { if(_sockfd >= 0) close(_sockfd); } // 回调方法 virtual void Sender() = 0; virtual void Recver() = 0; virtual void Excepter() = 0; ~Connection() { }private: int _sockfd; // 每一个文件描述符都有自己的输入输出缓冲区 std::string _inbuffer; std::string _outbuffer; InetAddr _peer_addr; // 对应哪一个客户端 // 添加一个指针 Reactor *_owner; // 我关心的事件 uint32_t _events; // 我这个 connection 关心的事件 // lastmodtime uint64_t _timestamp;};
🦋 封装 Reactor
#pragma once#include #include #include #include \"Epoller.hpp\"#include \"Connection.hpp\"using namespace EpollMoudle;using connection_t = std::shared_ptr<Connection>;class Reactor{ const static int event_num = 64;private: bool IsConnectionExists(int sockfd) { return _connections.find(sockfd) == _connections.end() ? false : true; }public: Reactor() : _isrunning(false), _epoller(std::make_unique<Epoller>()) { _epoller->Init(); } void InsertConnection(connection_t conn) { auto iter = _connections.find(conn->Sockfd()); if (iter == _connections.end()) { // 1. 把连接,放到unordered_map中进行管理 _connections.insert(std::make_pair(conn->Sockfd(), conn)); // 2. 把新插入进来的连接,写透到内核的epoll中 _epoller->Add(conn->Sockfd(), conn->GetEvents()); // 3. 设置关联关系,让connection回指当前对象 conn->SetOwner(this); } } void EnableReadWrite(int sockfd, bool readable, bool writeable) { if(IsConnectionExists(sockfd)) { // 修改用户层connection的事件 uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET); _connections[sockfd]->SetEvents(events); // 写透到内核中 _epoller->Update(sockfd, _connections[sockfd]->GetEvents()); } } void DelConnection(int sockfd) { if(IsConnectionExists(sockfd)) { // 1. 从内核中移除对sockfd的关心 _epoller->Delete(sockfd); // 2. 关闭特定的文件描述符 _connections[sockfd]->Close(); // 3. 从_connectionns中移除对应的connection _connections.erase(sockfd); } } // 基于事件驱动的事件派发器 void Dispatcher(int n) { for (int i = 0; i < n; i++) { // 开始进行派发, 派发给指定的模块 int sockfd = _revs[i].data.fd; uint32_t revents = _revs[i].events; if ((revents & EPOLLERR) || (revents & EPOLLHUP)) revents = (EPOLLIN | EPOLLOUT); // 异常事件,转换成为读写事件 if ((revents & EPOLLIN) && IsConnectionExists(sockfd)) { _connections[sockfd]->Recver(); } if ((revents & EPOLLOUT) && IsConnectionExists(sockfd)) { _connections[sockfd]->Sender(); } } } void LoopOnce(int timeout) { int n = _epoller->Wait(_revs, event_num, timeout); Dispatcher(n); } void DebugPrint() { std::cout << \"Epoller 管理的fd: \"; for(auto &iter : _connections) { std::cout << iter.first << \" \"; } std::cout << std::endl; } void Loop() { _isrunning = true; // int timeout = -1; int timeout = 1000; while (_isrunning) { LoopOnce(timeout); DebugPrint(); // 超时管理 // 简单的,遍历_connections 判断当前时间 - connection 的最近访问时间 > XXX // 超时了 } _isrunning = false; } void Stop() { _isrunning = false; } ~Reactor() { }private: std::unique_ptr<Epoller> _epoller; std::unordered_map<int, connection_t> _connections; // fd: Connection 服务器内部所有的连接 bool _isrunning; struct epoll_event _revs[event_num];};
🦋 封装 Listener(专门负责获取连接的模块)
#pragma once#include #include #include \"Socket.hpp\"#include \"InetAddr.hpp\"#include \"Connection.hpp\"#include \"Epoller.hpp\"#include \"Log.hpp\"#include \"IOService.hpp\"#include \"Reactor.hpp\"#include \"Protocol.hpp\"#include \"Calculator.hpp\"using namespace SocketModule;using namespace LogModule;// 专门负责获取连接的模块// 连接管理器class Listener : public Connection{public: Listener(uint16_t port) : _listensock(std::make_unique<TcpSocket>()), _port(port) { _listensock->BuildTcpSocketMethod(_port); SetSockfd(_listensock->Fd()); SetEvents(EPOLLIN | EPOLLET); } virtual void Sender() override { } // 我们回调到这里 天然就有父类connection virtual void Recver() override { // 读就绪,而且是listensock就绪 // IO处理 --- 获取新连接 // 你怎么知道 一次来的就是一个连接呢 怎么保证一次读完了 ET工作模式 ! while(true) { InetAddr peer; int aerrno = 0; // accept 非阻塞的时候,就是IO,我们就像处理read一样,处理accept int sockfd = _listensock->Accepter(&peer, &aerrno); if(sockfd > 0) { // success // 不能直接读取 ! // sockfd 添加到epoll ! // epollserver只认connection LOG(LogLevel::DEBUG) << \"Accepter success: \" << sockfd; // 普通的文件描述符, 处理IO的 也是connection // 2. sockfd包装成为 Connection ! auto conn = std::make_shared<IOService>(sockfd); conn->RegisterOnMessage(HandlerRequest); // 3. 插入到EpollServer GetOwner()->InsertConnection(conn); } else { if(aerrno == EAGAIN || aerrno == EWOULDBLOCK) { LOG(LogLevel::DEBUG) << \"accepter all connection ... done\"; break; } else if(aerrno == EINTR) { LOG(LogLevel::DEBUG) << \"accepter intr by signal, continue\"; continue; } else { LOG(LogLevel::DEBUG) << \"accepter error ... Ignore\"; break; } } } } virtual void Excepter() override { } int Sockfd() { return _listensock->Fd(); } ~Listener() { _listensock->Close(); }private: std::unique_ptr<Socket> _listensock; uint16_t _port;};
🦋 封装 IOService(只负责IO的)
#pragma once#include #include #include #include \"Socket.hpp\"#include \"InetAddr.hpp\"#include \"Connection.hpp\"#include \"Epoller.hpp\"#include \"Log.hpp\"#include \"Reactor.hpp\"using func_t = std::function<std::string(std::string &)>;using namespace SocketModule;using namespace LogModule;// 只负责IOclass IOService : public Connection{ static const int size = 1024;public: IOService(int sockfd) { // 1. 设置文件描述符非阻塞 SetNonBlock(sockfd); SetSockfd(sockfd); SetEvents(EPOLLIN | EPOLLET); } virtual void Sender() override { // UpdateTime(); // 直接写 while(true) { ssize_t n = send(Sockfd(), OutString().c_str(), OutString().size(), 0); if(n > 0) { // 成功 DiscardOutString(n); // 移除 N个 } else if(n == 0) { break; } else { if(errno == EAGAIN || errno == EWOULDBLOCK) { // 缓冲区写满了,下次再来 break; } else if(errno == EINTR) { continue; } else { Excepter(); return ; } } } // 一种:outbuffer empty // 一种:发送缓冲区写满了 && outbuffer没有empty 写条件不满足 使能 sockfd 在 epoll 中的事件 if(!isOutBufferEmpty()) { // 修改对sockfd 的读事件关心! -- 开启对写事件关心 // 按需设置! GetOwner()->EnableReadWrite(Sockfd(), true, true); // 读事件一般常设 写事件一般按需设置 } else { GetOwner()->EnableReadWrite(Sockfd(), true, false); // 按需设置 关闭了 !!! } } virtual void Recver() override { // UpdateTime(); // 1. 读取所有数据 while(true) // ET模式 { char buffer[size]; ssize_t s = recv(Sockfd(), buffer, sizeof(buffer) - 1, 0); // 非阻塞 if(s > 0) { buffer[s] = 0; // 读取成功 Append(buffer); } else if(s == 0) { // 对端关闭连接 Excepter(); return ; } else { if(errno == EAGAIN || errno == EWOULDBLOCK) { break; } else if(errno == EINTR) { continue; } else { // 发生错误了 Excepter(); return ; } } } // 走到下面,我一定把本轮数据读完了 std::cout << \"outbuffer: \\n\" << InBuffer() << std::endl; // 你能确保你读到的消息,就是一个完整的报文吗 // 我怎么知道 读到了完整的请求呢?? 协议 ! ! ! std::string result; if(_on_message) result = _on_message(InBuffer()); // 添加应答信息 AppendToOut(result); // 如何处理写的问题 outbuffer 发送给对方的问题 if(!isOutBufferEmpty()) { // 方案一: Sender(); // 直接发送,推荐做法 // Sender(); // 方案二: 使能writeable即可 GetOwner()->EnableReadWrite(Sockfd(), true, true); } } virtual void Excepter() override { // IO读取的时候,所有的异常处理,全部都会转化成为这一个函数的调用 // 出异常 怎么做? // 打印日志 差错处理 关闭连接, Reactor异常connection,从内核中,移除对fd的关心 LOG(LogLevel::INFO) << \"客户端连接可能结束,进行异常处理: \" << Sockfd(); GetOwner()->DelConnection(Sockfd()); } void RegisterOnMessage(func_t on_message) { _on_message = on_message; } ~IOService() {}private: func_t _on_message;};
🦋 Main.cc
#include #include #include \"Log.hpp\"#include \"Listener.hpp\"#include \"Connection.hpp\"#include \"Reactor.hpp\"using namespace LogModule;int main(int argc, char *argv[]){ ENABLE_CONSOLE_LOG(); if (argc != 2) { std::cout << \"Usage: \" << argv[0] << \" port\" << std::endl; return 0; } uint16_t local_port = std::stoi(argv[1]); Reactor reactor; auto conn = std::make_shared<Listener>(local_port); reactor.InsertConnection(conn); reactor.Loop(); return 0;}
三 :🔥 复盘分析
🦋 多路转接对写的处理
- 写事件是否就绪:发送缓冲器,是否有空间 !
有空间。sockfd,发送缓冲区中的空间,默认就是有的!!!,只有当他被填写满了,写条件才不具备
- 如何正确处理写入:直接写(不会被阻塞),写缓冲区可能会越来越短,写满了,写条件不具备,才托管给
epoll
,让它帮我关心 !!!
- 直接写!写入默认就是就绪的 !
- 写入失败,才托管给
epoll
! - 多路转接的方案设计的时候,写事件关心,永远不能常开启。
- 写事件关心 , 按需设置 !
- 读事件一般常设,写事件一般按需设置 !
- 只要使能开启写,
内核+epoll
会自动事件派发处理底层的发送任务。 - 从此往后,我们专注于业务即可 !
🦋 最佳实践 One Thread One loop
🦈 一个 fd (Connection)
的全生命周期,只能由一个线程统一管理,这样就不会存在任何(过多)并发问题
🧊 一个
loop
就是 一个Reactor
四 :🔥 共勉
😋 以上就是我对 【Linux】高性能网络模式:Reactor 反应堆模式
的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉