> 文档中心 > 简单模仿下muduo网络库的封装

简单模仿下muduo网络库的封装

文章目录

    • 一、基础socket编程
    • 二、抽象与层次
      • 2.1 InetAddress封装
      • 2.2 Socket封装
      • 2.3 Epoll封装
      • 2.4 Channel封装
      • 2.5 Acceptor封装
      • 2.6 Connection封装
      • 2.7 Threadpool封装
      • 2.8 Eventloop封装
      • 2.9 TCPserver封装
    • 三、小结

一、基础socket编程

网络编程的底层离不开socket,其处理流程表示如下:

int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serv_addr;bzero(&serv_addr, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");serv_addr.sin_port = htons(8888);bind(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr));listen(sockfd, SOMAXCONN);struct sockaddr_in clnt_addr;socklen_t clnt_addr_len = sizeof(clnt_addr);bzero(&clnt_addr, sizeof(clnt_addr));int clnt_sockfd = accept(sockfd, (sockaddr*)&clnt_addr, &clnt_addr_len);

当然,目前常用的服务器架构都离不开epoll的帮助,其常用处理逻辑如下:

int epfd = epoll_create(0);while(1){int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);for(int i = 0; i < nfds; ++i){// solve...}}

上述的做法显然是一种C语言的处理方式,并且所有逻辑都放在一起处理,这容易使得我们的程序越来越庞大和臃肿。此时我们需要对程序进行模块化,每一个模块专门处理一个任务,这样可以增加程序的可读性,也可以写出更大庞大、功能更加复杂的程序。

本文主要仿照muduo网络库的底层封装,设计一个烂大街的echo服务器。

二、抽象与层次

2.1 InetAddress封装

新建立服务器时通常要有绑定协议类型、IP、端口等固定操作,采用InetAddress类进行封装。

class InetAddress {  public:    InetAddress() = default;    InetAddress(const char *ip, uint16_t port);    ~InetAddress() = default;    void SetInetAddr(sockaddr_in _addr);    struct sockaddr_in GetAddr();    const char* GetIp();    uint16_t GetPort();private:    struct sockaddr_in addr_;};

这样在后面使用时只需要下面一条语句即可:

InetAddress *addr = new InetAddress("127.0.0.1", 8888);

2.2 Socket封装

socket封装的原因主要在于socke、listen、bind等固定且繁琐操作,于是采用Socket类来进行封装。

class Socket {private:    int fd_;public:    Socket();    explicit Socket(int);    ~Socket();    void Bind(InetAddress *addr);    void Listen();    void Setnonblocking();    bool IsNonBlocking();    void Setreuseaddr();    void Setreuseport();    int Accept(InetAddress *addr);    void Connect(InetAddress *addr);    void Connect(const char *ip, uint16_t port);    int GetFd();};

这样我们只需要这样使用即可:

sock_ = new Socket();sock_->Setreuseaddr();sock_->Setreuseport();sock_->Bind(addr);sock_->Listen();

2.3 Epoll封装

epoll的使用同样离不开几个固定的套路,从epoll_create、epoll_ctl到epoll_wait,我们采用以下方法来封装:

class Epoll {public:    Epoll();    ~Epoll(); void UpdateChannel(Channel * ch);    void DeleteChannel(Channel * ch);    std::vector<Channel*> Poll(int timeout = -1);private:    int epfd_;    struct epoll_event *events_;};

2.4 Channel封装

2.3中主要对epoll的一些操作进行封装,而后续在建立一个新连接时,我们需要将其添加到红黑树中,后续当该连接上发生事件时,我们需要使用不同的处理方式来应对,于是乎,我们采用了一个Channel类,每个Chanel只会对一个fd负责,对不同的事件类型设置不同的处理逻辑。

class Channel {public:    Channel(EventLoop *_loop, int _fd);    ~Channel();    void HandleEvent();    void EnableRead();    int GetFd();    uint32_t GetListenEvents();    uint32_t GetReadyEvents();    bool GetInEpoll();    void SetInEpoll(bool _in = true);    void UseET();    void SetReadyEvents(uint32_t ev);    void SetReadCallback(std::function<void()> const &cb);private:    EventLoop *loop_;    int fd_;    uint32_t listen_events_;    uint32_t ready_events_;    bool in_epoll_;    std::function<void()> read_callback_;    std::function<void()> write_callback_;};

值得注意的是,我们采用function/bind的回调方法作为类和类之间沟通的方法,给每个不同的channel绑定不同的read/write方法,针对不同fd实现对应的处理逻辑,实例如下:

acceptChannel_ = new Channel(loop_, sock_->GetFd());std::function<void()> cb = std::bind(&Acceptor::AcceptConnection, this);acceptChannel_->SetReadCallback(cb);acceptChannel_->EnableRead();

2.5 Acceptor封装

对于每个客户,首先需要做的事都是调用accept()函数接受这个TCP连接,然后将socket文件描述符添加到epoll,当这个IO口有事件发生的时候,再对此TCP连接提供相应的服务。因此,我们可以添加一个Acceptor类,该类拥有一个独特的accept fd,也通过一个独有的Channel负责分发到epoll,该Channel的事件处理函数read_callback_()会调用Acceptor中的接受连接函数AcceptConnection()来新建一个TCP连接。

class Acceptor{public:    explicit Acceptor(EventLoop *loop);    ~Acceptor();    void AcceptConnection();    void SetNewConnectionCallback(std::function<void(Socket*)> const &cb);private:    EventLoop *loop_;    Socket *sock_;    Channel *acceptChannel_;    std::function<void(Socket*)> newConnectionCallback_;};

在Acceptor类中使用的newConnectionCallback_回调则是用来处理新建立的TCP连接的,在其他类中写好回调逻辑即可通过void SetNewConnectionCallback(std::function const &cb)完成目的。

2.6 Connection封装

在2.5节中,我们将accept抽象出来封装成了一个类,在accept后得到的一个新的TCP连接,该连接在四次挥手前将一直存在,于是我们也将一个TCP连接也浅浅的抽象封装一下。

class Connection{public:    Connection(EventLoop *loop, Socket *sock);    ~Connection();    void Read();    void Write();    void SetDeleteConnectionCallback(std::function<void(Socket *)> const &callback);    void SetOnConnectCallback(std::function<void(Connection *)> const &callback);    void Close();    void SetSendBuffer(const char *str);    Buffer *GetReadBuffer();    const char *ReadBuffer();    Buffer *GetSendBuffer();    const char *SendBuffer();    void GetlineSendBuffer();    Socket *GetSocket();    void OnConnect(std::function<void()> fn);private:    EventLoop *loop_;    Socket *sock_;    Channel *channel_;    State state_;    Buffer *readBuffer_;    Buffer *writeBuffer_; std::function<void(Socket *)> deleteConnectionCallback_;    std::function<void(Connection *)> onConnectCallback_;    void ReadNonBlocking();    void WriteNonBlocking();    void ReadBlocking();    void WriteBlocking();};

其中,deleteConnectionCallback_onConnectCallback_两个回调函数则是用于每个连接的释放和业务处理,在TCP连接建立时通过SetDeleteConnectionCallback()SetOnConnectCallback()进行设置。

2.7 Threadpool封装

在服务器设计中,线程池一定是最重要的模块之一,当某个连接上有事件触发时,我们直接将其丢给工作线程去处理,这能够很大程度上提高服务器的性能。

class Threadpool{private:    std::vector<std::thread> threads_;    std::queue<std::function<void()>> tasks_;    std::mutex tasks_mtx_;    std::condition_variable cv_;    bool stop_;public:    explicit Threadpool(int size = std::thread::hardware_concurrency());    ~Threadpool();    template<class F, class... Args>    auto Add(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;};template<class F, class... Args>auto Threadpool::Add(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {    using return_type = typename std::result_of<F(Args...)>::type;    auto task = std::make_shared< std::packaged_task<return_type()> >(     std::bind(std::forward<F>(f), std::forward<Args>(args)...) );    std::future<return_type> res = task->get_future();    { std::unique_lock<std::mutex> lock(tasks_mtx_); // don't allow enqueueing after stopping the pool if(stop_)     throw std::runtime_error("enqueue on stopped ThreadPool"); tasks_.emplace([task](){ (*task)(); });    }    cv_.notify_one();    return res;}Threadpool::Threadpool(int size) : stop_(false){    for(int i = 0; i < size; ++i){ threads_.emplace_back(std::thread([this](){     while(true){  std::function<void()> task;  {      std::unique_lock<std::mutex> lock(tasks_mtx_);      cv_.wait(lock, [this](){   return stop_ || !tasks_.empty();      });      if(stop_ && tasks_.empty()) return;      task = tasks_.front();      tasks_.pop();  }  task();     } }));    }}Threadpool::~Threadpool(){    { std::unique_lock<std::mutex> lock(tasks_mtx_); stop_ = true;    }    cv_.notify_all();    for(std::thread &th : threads_){ if(th.joinable())     th.join();    }}

上述封装的线程池使用右值移动、完美转发等阻止拷贝,另外使用add函数前不需要手动绑定参数,而是直接传递,并且可以得到任务的返回值。

2.8 Eventloop封装

上述类实际上都是对一些底层设置以及抽象流程的封装,但目前服务器是一个顺序的处理结构,当我们的服务器结构越来越庞大、功能越来越复杂、模块越来越多,这种顺序程序设计的思想显然是不能满足需求的。由此引入了两种服务器的开发模式,Reactor和Proactor模式。

本文的服务器模式为Reactor模式,同时给出两种不同的reactor模式,下面以饭店案例来说明:

1、单reactor多线程,1个前台接待,多个服务员,接待员只负责接待。
2、主从reactor多线程,多个前台接待,多个服务员。

本节给出的时主从reactor模式,先来张图说明下:
简单模仿下muduo网络库的封装
也就是说,主reactor负责线程的accept,将得到的新连接交给其余的从reactor进行处理,代码如下:

class EventLoop {public:    EventLoop();    ~EventLoop();    void Loop();    void UpdateChannel(Channel*);private:    Epoll *ep_;    bool quit_;};void EventLoop::Loop(){    while(!quit_){    std::vector<Channel*> chs; chs = ep_->Poll(); for(auto &it : chs) {     it->HandleEvent(); }    }}// 实例如下:acceptor_ = new Acceptor(mainReactor_);int size = std::thread::hardware_concurrency();thpool_ = new Threadpool(size);for(int i = 0; i < size; ++i)    subReactors_.push_back(new EventLoop());for(int i = 0; i < size; ++i){    std::function<void()> sub_loop = std::bind(&EventLoop::Loop, subReactors_[i]);    thpool_->Add(std::move(sub_loop));}

显然,每个reactor都是一个事件处理循环,主从循环都会一直从事着自己的本职工作。

2.9 TCPserver封装

综上所述,我们已经将服务器的核心类进行了封装,但是各个类的管理还是不太合理,这里我们又构造了一个TCPserver类,如下:

class TCPserver{private:    EventLoop *mainReactor_;    Acceptor *acceptor_;    std::map<int, Connection*> connections_;    std::vector<EventLoop*> subReactors_;    Threadpool *thpool_;    std::function<void(Connection *)> onConnectCallback_;public:    explicit TCPserver(EventLoop* loop);    ~TCPserver();    void NewConnection(Socket *sock);    void DeleteConnection(Socket *sock);    void OnConnect(std::function<void(Connection *)> fn);};

三、小结

本文主要尝试对muduo底层封装进行探究,很多地方可能理解的不是很到位,希望同学们谅解并指出问题所在哈!

完整代码:https://download.csdn.net/download/hjlogzw/85264548

参考博客:
https://zhuanlan.zhihu.com/p/347779760
https://blog.csdn.net/mweibiao/article/details/79713698
https://blog.csdn.net/fengyuesong/article/details/122313500?spm=1001.2014.3001.5501