简单模仿下muduo网络库的封装
文章目录
-
- 一、基础socket编程
- 二、抽象与层次
-
- 2.1 InetAddress封装
- 2.2 Socket封装
- 2.3 Epoll封装
- 2.4 Channel封装
- 2.5 Acceptor封装
- 2.6 Connection封装
- 2.7 Threadpool封装
- 2.8 Eventloop封装
- 2.9 TCPserver封装
- 三、小结
一、基础socket编程
网络编程的底层离不开socket,其处理流程表示如下:
int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serv_addr;bzero(&serv_addr, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");serv_addr.sin_port = htons(8888);bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr));listen(sockfd, SOMAXCONN);struct sockaddr_in clnt_addr;socklen_t clnt_addr_len = sizeof(clnt_addr);bzero(&clnt_addr, sizeof(clnt_addr));int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len);
当然,目前常用的服务器架构都离不开epoll的帮助,其常用处理逻辑如下:
int epfd = epoll_create(0);while(1){int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);for(int i = 0; i < nfds; ++i){// solve...}}
上述的做法显然是一种C语言的处理方式,并且所有逻辑都放在一起处理,这容易使得我们的程序越来越庞大和臃肿。此时我们需要对程序进行模块化,每一个模块专门处理一个任务,这样可以增加程序的可读性,也可以写出更大庞大、功能更加复杂的程序。
本文主要仿照muduo网络库的底层封装,设计一个烂大街的echo服务器。
二、抽象与层次
2.1 InetAddress封装
新建立服务器时通常要有绑定协议类型、IP、端口等固定操作,采用InetAddress类进行封装。
class InetAddress { public: InetAddress() = default; InetAddress(const char *ip, uint16_t port); ~InetAddress() = default; void SetInetAddr(sockaddr_in _addr); struct sockaddr_in GetAddr(); const char* GetIp(); uint16_t GetPort();private: struct sockaddr_in addr_;};
这样在后面使用时只需要下面一条语句即可:
InetAddress *addr = new InetAddress("127.0.0.1", 8888);
2.2 Socket封装
socket封装的原因主要在于socke、listen、bind等固定且繁琐操作,于是采用Socket类来进行封装。
class Socket {private: int fd_;public: Socket(); explicit Socket(int); ~Socket(); void Bind(InetAddress *addr); void Listen(); void Setnonblocking(); bool IsNonBlocking(); void Setreuseaddr(); void Setreuseport(); int Accept(InetAddress *addr); void Connect(InetAddress *addr); void Connect(const char *ip, uint16_t port); int GetFd();};
这样我们只需要这样使用即可:
sock_ = new Socket();sock_->Setreuseaddr();sock_->Setreuseport();sock_->Bind(addr);sock_->Listen();
2.3 Epoll封装
epoll的使用同样离不开几个固定的套路,从epoll_create、epoll_ctl到epoll_wait,我们采用以下方法来封装:
class Epoll {public: Epoll(); ~Epoll(); void UpdateChannel(Channel * ch); void DeleteChannel(Channel * ch); std::vector<Channel*> Poll(int timeout = -1);private: int epfd_; struct epoll_event *events_;};
2.4 Channel封装
2.3中主要对epoll的一些操作进行封装,而后续在建立一个新连接时,我们需要将其添加到红黑树中,后续当该连接上发生事件时,我们需要使用不同的处理方式来应对,于是乎,我们采用了一个Channel类,每个Chanel只会对一个fd负责,对不同的事件类型设置不同的处理逻辑。
class Channel {public: Channel(EventLoop *_loop, int _fd); ~Channel(); void HandleEvent(); void EnableRead(); int GetFd(); uint32_t GetListenEvents(); uint32_t GetReadyEvents(); bool GetInEpoll(); void SetInEpoll(bool _in = true); void UseET(); void SetReadyEvents(uint32_t ev); void SetReadCallback(std::function<void()> const &cb);private: EventLoop *loop_; int fd_; uint32_t listen_events_; uint32_t ready_events_; bool in_epoll_; std::function<void()> read_callback_; std::function<void()> write_callback_;};
值得注意的是,我们采用function/bind
的回调方法作为类和类之间沟通的方法,给每个不同的channel绑定不同的read/write方法,针对不同fd实现对应的处理逻辑,实例如下:
acceptChannel_ = new Channel(loop_, sock_->GetFd());std::function<void()> cb = std::bind(&Acceptor::AcceptConnection, this);acceptChannel_->SetReadCallback(cb);acceptChannel_->EnableRead();
2.5 Acceptor封装
对于每个客户,首先需要做的事都是调用accept()
函数接受这个TCP连接,然后将socket文件描述符添加到epoll,当这个IO口有事件发生的时候,再对此TCP连接提供相应的服务。因此,我们可以添加一个Acceptor类,该类拥有一个独特的accept fd,也通过一个独有的Channel
负责分发到epoll,该Channel的事件处理函数read_callback_()
会调用Acceptor中的接受连接函数AcceptConnection()
来新建一个TCP连接。
class Acceptor{public: explicit Acceptor(EventLoop *loop); ~Acceptor(); void AcceptConnection(); void SetNewConnectionCallback(std::function<void(Socket*)> const &cb);private: EventLoop *loop_; Socket *sock_; Channel *acceptChannel_; std::function<void(Socket*)> newConnectionCallback_;};
在Acceptor类中使用的newConnectionCallback_回调则是用来处理新建立的TCP连接的,在其他类中写好回调逻辑即可通过void SetNewConnectionCallback(std::function const &cb)
完成目的。
2.6 Connection封装
在2.5节中,我们将accept抽象出来封装成了一个类,在accept后得到的一个新的TCP连接,该连接在四次挥手前将一直存在,于是我们也将一个TCP连接也浅浅的抽象封装一下。
class Connection{public: Connection(EventLoop *loop, Socket *sock); ~Connection(); void Read(); void Write(); void SetDeleteConnectionCallback(std::function<void(Socket *)> const &callback); void SetOnConnectCallback(std::function<void(Connection *)> const &callback); void Close(); void SetSendBuffer(const char *str); Buffer *GetReadBuffer(); const char *ReadBuffer(); Buffer *GetSendBuffer(); const char *SendBuffer(); void GetlineSendBuffer(); Socket *GetSocket(); void OnConnect(std::function<void()> fn);private: EventLoop *loop_; Socket *sock_; Channel *channel_; State state_; Buffer *readBuffer_; Buffer *writeBuffer_; std::function<void(Socket *)> deleteConnectionCallback_; std::function<void(Connection *)> onConnectCallback_; void ReadNonBlocking(); void WriteNonBlocking(); void ReadBlocking(); void WriteBlocking();};
其中,deleteConnectionCallback_
与onConnectCallback_
两个回调函数则是用于每个连接的释放和业务处理,在TCP连接建立时通过SetDeleteConnectionCallback()
与SetOnConnectCallback()
进行设置。
2.7 Threadpool封装
在服务器设计中,线程池一定是最重要的模块之一,当某个连接上有事件触发时,我们直接将其丢给工作线程去处理,这能够很大程度上提高服务器的性能。
class Threadpool{private: std::vector<std::thread> threads_; std::queue<std::function<void()>> tasks_; std::mutex tasks_mtx_; std::condition_variable cv_; bool stop_;public: explicit Threadpool(int size = std::thread::hardware_concurrency()); ~Threadpool(); template<class F, class... Args> auto Add(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;};template<class F, class... Args>auto Threadpool::Add(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(tasks_mtx_); // don't allow enqueueing after stopping the pool if(stop_) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks_.emplace([task](){ (*task)(); }); } cv_.notify_one(); return res;}Threadpool::Threadpool(int size) : stop_(false){ for(int i = 0; i < size; ++i){ threads_.emplace_back(std::thread([this](){ while(true){ std::function<void()> task; { std::unique_lock<std::mutex> lock(tasks_mtx_); cv_.wait(lock, [this](){ return stop_ || !tasks_.empty(); }); if(stop_ && tasks_.empty()) return; task = tasks_.front(); tasks_.pop(); } task(); } })); }}Threadpool::~Threadpool(){ { std::unique_lock<std::mutex> lock(tasks_mtx_); stop_ = true; } cv_.notify_all(); for(std::thread &th : threads_){ if(th.joinable()) th.join(); }}
上述封装的线程池使用右值移动、完美转发等阻止拷贝,另外使用add
函数前不需要手动绑定参数,而是直接传递,并且可以得到任务的返回值。
2.8 Eventloop封装
上述类实际上都是对一些底层设置以及抽象流程的封装,但目前服务器是一个顺序的处理结构,当我们的服务器结构越来越庞大、功能越来越复杂、模块越来越多,这种顺序程序设计的思想显然是不能满足需求的。由此引入了两种服务器的开发模式,Reactor和Proactor模式。
本文的服务器模式为Reactor模式,同时给出两种不同的reactor模式,下面以饭店案例来说明:
1、单reactor多线程,1个前台接待,多个服务员,接待员只负责接待。
2、主从reactor多线程,多个前台接待,多个服务员。
本节给出的时主从reactor模式,先来张图说明下:
也就是说,主reactor负责线程的accept,将得到的新连接交给其余的从reactor进行处理,代码如下:
class EventLoop {public: EventLoop(); ~EventLoop(); void Loop(); void UpdateChannel(Channel*);private: Epoll *ep_; bool quit_;};void EventLoop::Loop(){ while(!quit_){ std::vector<Channel*> chs; chs = ep_->Poll(); for(auto &it : chs) { it->HandleEvent(); } }}// 实例如下:acceptor_ = new Acceptor(mainReactor_);int size = std::thread::hardware_concurrency();thpool_ = new Threadpool(size);for(int i = 0; i < size; ++i) subReactors_.push_back(new EventLoop());for(int i = 0; i < size; ++i){ std::function<void()> sub_loop = std::bind(&EventLoop::Loop, subReactors_[i]); thpool_->Add(std::move(sub_loop));}
显然,每个reactor都是一个事件处理循环,主从循环都会一直从事着自己的本职工作。
2.9 TCPserver封装
综上所述,我们已经将服务器的核心类进行了封装,但是各个类的管理还是不太合理,这里我们又构造了一个TCPserver类,如下:
class TCPserver{private: EventLoop *mainReactor_; Acceptor *acceptor_; std::map<int, Connection*> connections_; std::vector<EventLoop*> subReactors_; Threadpool *thpool_; std::function<void(Connection *)> onConnectCallback_;public: explicit TCPserver(EventLoop* loop); ~TCPserver(); void NewConnection(Socket *sock); void DeleteConnection(Socket *sock); void OnConnect(std::function<void(Connection *)> fn);};
三、小结
本文主要尝试对muduo底层封装进行探究,很多地方可能理解的不是很到位,希望同学们谅解并指出问题所在哈!
完整代码:https://download.csdn.net/download/hjlogzw/85264548
参考博客:
https://zhuanlan.zhihu.com/p/347779760
https://blog.csdn.net/mweibiao/article/details/79713698
https://blog.csdn.net/fengyuesong/article/details/122313500?spm=1001.2014.3001.5501