> 技术文档 > [muduo网络库]-muduo库TcpConnection类解析

[muduo网络库]-muduo库TcpConnection类解析

本贴用于记录muduo库的学习过程,以下是关于TcpConnection类的个人理解。

TcpConnection封装了一次连接,包括建立连接销毁链接的方法,以及各种事件的处理函数。

下面是TcpConnection大概创建流程:

重要成员变量:

EventLoop *loop_; // 这里是baseloop还是subloop由TcpServer中创建的线程数决定 若为多Reactor 该 loop_指向subloop 若为单Reactor 该loop_指向baseloopconst std::string name_;std::atomic_int state_;bool reading_;//连接是否在监听读事件std::unique_ptr socket_;std::unique_ptr channel_;const InetAddress localAddr_;const InetAddress peerAddr_; // 这些回调TcpServer也有 用户通过写入TcpServer注册 TcpServer再将注册的回调传递给TcpConnection TcpConnection再将回调注册到Channel中 ConnectionCallback connectionCallback_; // 有新连接时的回调 MessageCallback messageCallback_; // 有读写消息时的回调 WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调 HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调 CloseCallback closeCallback_; // 关闭连接的回调 size_t highWaterMark_; // 高水位阈值 // 数据缓冲区 Buffer inputBuffer_; // 接收数据的缓冲区 Buffer outputBuffer_; // 发送数据的缓冲区 用户send向outputBuffer_发

loop_:这个loop由tcpserver分配,表示由这个loop管理这个tcpconnection对象的channel。

state_:这里为什么用atomic_int定义呢?我的理解是这样:对于连接状态的修改问题,必须是要是原子性的。举个例子:state_=0,现在state_++,希望得到状态1,在++过程中线程2也要使用state_++,但是如果没有atomic_int,他可能会取到state_的旧值0先进行++,导致线程1得到state_++的值为2了,出现错误。

socket_:连接套接字。

channel_:包含各种回调函数,socket_一触发事件就会通过channel_来调用。
localAddr_:服务器的网络地址。

peerAddr_:客户端的网络地址。

highWaterMark_:高水位阈值。应用写数据很快,但是内核发送数据就慢得多,如果有大量数据要发送,缓冲区数据不能及时发送出去,导致越来越满,所以设置了一个阈值,一达到就会调用回调函数,提醒我们缓冲区快满了,写的慢一点。

重要成员函数

TcpConnection::TcpConnection(EventLoop *loop, const std::string &nameArg, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr) : loop_(CheckLoopNotNull(loop)) , name_(nameArg) , state_(kConnecting) , reading_(true) , socket_(new Socket(sockfd)) , channel_(new Channel(loop, sockfd)) , localAddr_(localAddr) , peerAddr_(peerAddr) , highWaterMark_(64 * 1024 * 1024) // 64M{ // 下面给channel设置相应的回调函数 poller给channel通知感兴趣的事件发生了 channel会回调相应的回调函数 channel_->setReadCallback( std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)); channel_->setWriteCallback( std::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( std::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( std::bind(&TcpConnection::handleError, this)); LOG_INFO(\"TcpConnection::ctor[%s] at fd=%d\\n\", name_.c_str(), sockfd); socket_->setKeepAlive(true);}

构造函数中主要就是设置回调。

void TcpConnection::send(const std::string &buf){ if (state_ == kConnected) { if (loop_->isInLoopThread()) // 这种是对于单个reactor的情况 用户调用conn->send时 loop_即为当前线程 { sendInLoop(buf.c_str(), buf.size()); } else { loop_->runInLoop( std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size())); } }}

send()用于发送数据。有人可能会好奇为什么这里要判断一下loop是否属于当前线程,如果不属于还要把这个事件发送到属于的线程操作。为什么要由相应的线程操作,这个很好理解:避免资源竞争。现在搭建可能会有一个疑惑:这个send函数为什么需要判断loop是否属于当前线程。tcpconnection对象的channel一直由相应线程监听,为什么会出现需要别的线程处理的情况呢?

我们来看一下send函数的调用过程:

套接字有写事件发生->调用channel的handleEvent中的handleEventsWithGuard中的readcallback_-> 最终调用EchoServer中的onMessage(),答案已经显而易见了,肯定是echoserver不在channel所在的线程,这样一来问题就解决了。

这种构造特别巧妙,可以把业务处理和其他部分分离开来,我们可以在onmessage()里面自制响应方法。

下面是发送逻辑的具体部分:

void TcpConnection::sendInLoop(const void *data, size_t len){ ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; if (state_ == kDisconnected) // 之前调用过该connection的shutdown 不能再进行发送了 { LOG_ERROR(\"disconnected, give up writing\"); } // 表示channel_第一次开始写数据或者缓冲区没有待发送数据 if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = ::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { // 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了 loop_->queueInLoop(  std::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote channel,调用channel对应注册的writeCallback_回调方法, * channel的writeCallback_实际上就是TcpConnection设置的handleWrite回调, * 把发送缓冲区outputBuffer_的内容全部发送完成 **/ if (!faultError && remaining > 0) { // 目前发送缓冲区剩余的待发送的数据的长度 size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen queueInLoop( std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append((char *)data + nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout } }}

这部分比较重要的是在缓冲区数据没全部发送到内核上时,必须注册channel的写事件。

void TcpConnection::shutdown(){ if (state_ == kConnected) { setState(kDisconnecting); loop_->runInLoop( std::bind(&TcpConnection::shutdownInLoop, this)); }}void TcpConnection::shutdownInLoop(){ if (!channel_->isWriting()) // 说明当前outputBuffer_的数据全部向外发送完成 { socket_->shutdownWrite(); }}
void Socket::shutdownWrite(){ if (::shutdown(sockfd_, SHUT_WR) < 0) { LOG_ERROR(\"shutdownWrite error\"); }}

可以看到这里提供了一种关闭写端的方法。引用@吃我一个平底锅文中的一句话:陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。

tcpconnection的连接和销毁

// 连接建立void TcpConnection::connectEstablished(){ setState(kConnected); channel_->tie(shared_from_this()); channel_->enableReading(); // 向poller注册channel的EPOLLIN读事件 // 新连接建立 执行回调 connectionCallback_(shared_from_this());}// 连接销毁void TcpConnection::connectDestroyed(){ if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); // 把channel的所有感兴趣的事件从poller中删除掉 connectionCallback_(shared_from_this()); } channel_->remove(); // 把channel从poller中删除掉}

因为连接关闭和错误是默认监听的,而写事件只在缓冲区数据没全部写入内核时才监听,所以建立连接时只用监听读事件。

下面是读、写、关闭、错误的回调函数:

// 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据void TcpConnection::handleRead(Timestamp receiveTime){ int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) // 有数据到达 { // 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针 messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) // 客户端断开 { handleClose(); } else // 出错了 { errno = savedErrno; LOG_ERROR(\"TcpConnection::handleRead\"); handleError(); }}void TcpConnection::handleWrite(){ if (channel_->isWriting()) { int savedErrno = 0; ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno); if (n > 0) { outputBuffer_.retrieve(n);//从缓冲区读取reable区域的数据移动readindex下标 if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); if (writeCompleteCallback_) {  // TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调  loop_->queueInLoop( std::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) {  shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉 } } } else { LOG_ERROR(\"TcpConnection::handleWrite\"); } } else { LOG_ERROR(\"TcpConnection fd=%d is down, no more writing\", channel_->fd()); }}void TcpConnection::handleClose(){ LOG_INFO(\"TcpConnection::handleClose fd=%d state=%d\\n\", channel_->fd(), (int)state_); setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr connPtr(shared_from_this()); connectionCallback_(connPtr); closeCallback_(connPtr); // 执行关闭连接的回调 执行的是TcpServer::removeConnection回调方法 // must be the last line}void TcpConnection::handleError(){ int optval; socklen_t optlen = sizeof optval; int err = 0; if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0) { err = errno; } else { err = optval; } LOG_ERROR(\"TcpConnection::handleError name:%s - SO_ERROR:%d\\n\", name_.c_str(), err);}

最后强调一个最重要的生命周期问题,TcpConnection继承自enable_shared_from_this,生命周期由shared_ptr托管,在正常情况(非正常情况可以去看我channel的那期)下tcpconnection能不能正常销毁,我们从对象创建开始分析。

1.首先他在tcpserver中创建,并放入哈希表,引用计数为1。

2.客户端断开连接,在channel::handleEvent中weak_ptr被提为shared_ptr引用计数+1,总数为2。

3.进入函数handleclose内,创建一个shared_ptr,引用计数+1,总数为3。

4.在closecallback内的TcpServer::removeConnectionInLoop中,哈希表的erase()销毁了一个shared_ptr,引用计数-1,总数为2,在TcpServer::removeConnectionInLoop函数处理完之后,第3步的ptr离开作用域销毁,引用计数-1,总数为1。

5.closecallback结束后,handleEvent也结束,第二步的ptr也离开作用域销毁,引用计数-1,总数为0,Tcpconnection对象成功销毁,并且成功清理了相关资源。


以上就是我对TcpConnection类的理解,欢迎大家交流讨论。