从TRAS Connection::send分析EPOLLOUT触发时机

时间:2022-06-10
本文章向大家介绍从TRAS Connection::send分析EPOLLOUT触发时机,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

首先,Tars默认采用ET

//构造函数
TC_Epoller(bool bEt = true);

//封装epoll_ctrl
void TC_Epoller::ctrl(int fd, long long data, __uint32_t events, int op)
{
    struct epoll_event ev;
    ev.data.u64 = data;
    if(_et)
    {
        ev.events   = events | EPOLLET;
    }
    else
    {
        ev.events   = events;
    }

    epoll_ctl(_iEpollfd, op, fd, &ev);
}

下面开始分析发包函数:

int TC_EpollServer::NetThread::Connection::send(const string& buffer, const string &ip, uint16_t port, bool byEpollOut)
{
    const bool isUdp = (_lfd == -1);
    if(isUdp)
    {
        int iRet = _sock.sendto((const void*) buffer.c_str(), buffer.length(), ip, port, 0);
        if(iRet < 0)
        {
            _pBindAdapter->getEpollServer()->error("[TC_EpollServer::Connection] send [" + _ip + ":" + TC_Common::tostr(_port) + "] error");
            return -1;
        }
        return 0;
    }

    if (byEpollOut)
    {// 3.等待 缓冲区重新可写时,自动触发EPOLLOUT,epoll_wait执行NetThread::processNet,发剩余的包数据
        int bytes = this->send(_sendbuffer);
        if (bytes == -1) 
        { 
            _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer."); 
            return -1; 
        } 

        this->adjustSlices(_sendbuffer, bytes);
        _pBindAdapter->getEpollServer()->info("byEpollOut [" + _ip + ":" + TC_Common::tostr(_port) + "] send bytes " + TC_Common::tostr(bytes)); 
    }
    else
    {// 4. 如果连接下一个发送包进来时,还未发送完毕,则把当前需要发送的包“粘贴”到未发送buffer中
        const size_t kChunkSize = 8 * 1024 * 1024;
        if (!_sendbuffer.empty()) 
        { 
            TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool;
            // avoid too big chunk
            for (size_t chunk = 0; chunk * kChunkSize < buffer.size(); chunk ++)
            {
                size_t needs = std::min<size_t>(kChunkSize, buffer.size() - chunk * kChunkSize);

                TC_Slice slice = pool->Allocate(needs);
                ::memcpy(slice.data, buffer.data() + chunk * kChunkSize, needs);
                slice.dataLen = needs;

                _sendbuffer.push_back(slice);
            }
        } 
        else //1.NetThread::send强制触发EPOLLOUT, epoll_wait执行NetThread::processPipe,第一次发包
        { 
            int bytes = this->tcpSend(buffer.data(), buffer.size()); 
            if (bytes == -1) 
            { 
                _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by peer."); 
                return -1; 
            } 
            else if (bytes < static_cast<int>(buffer.size())) 
            { //2.发不完的包数据,写入_sendbuffer; 
                const char* remainData = &buffer[bytes];
                const size_t remainLen = buffer.size() - static_cast<size_t>(bytes);
            
                TC_BufferPool* pool = _pBindAdapter->getEpollServer()->getNetThreadOfFd(_sock.getfd())->_bufferPool;
                // avoid too big chunk
                for (size_t chunk = 0; chunk * kChunkSize < remainLen; chunk ++)
                {
                    size_t needs = std::min<size_t>(kChunkSize, remainLen - chunk * kChunkSize);

                    TC_Slice slice = pool->Allocate(needs);
                    ::memcpy(slice.data, remainData + chunk * kChunkSize, needs);
                    slice.dataLen = needs;

                    _sendbuffer.push_back(slice);
                }
                // end
                _pBindAdapter->getEpollServer()->info("EAGAIN[" + _ip + ":" + TC_Common::tostr(_port) +
                        ", to sent bytes " + TC_Common::tostr(remainLen) +
                        ", total sent " + TC_Common::tostr(buffer.size()));
            } 
        } 
    }

    size_t toSendBytes = 0;
    for (const auto& slice : _sendbuffer)
    {
        toSendBytes += slice.dataLen;
    }

    if (toSendBytes >= 8 * 1024)
    {
        _pBindAdapter->getEpollServer()->info("big _sendbuffer > 8K");
        size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit();

        if(iBackPacketBuffLimit != 0 && toSendBytes >= iBackPacketBuffLimit)
        {
            _pBindAdapter->getEpollServer()->error("send [" + _ip + ":" + TC_Common::tostr(_port) + "] buffer too long close.");
            clearSlices(_sendbuffer);
            return -2;
        }
    }


    //需要关闭链接
    if(_bClose && _sendbuffer.empty())
    {
        _pBindAdapter->getEpollServer()->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by user.");
        return -2;
    }

    return 0;
}

1.NetThread::send强制触发EPOLLOUT, epoll_wait执行NetThread::processPipe,第一次发包

2.发不完的包数据,分片成TC_Slice, 写入_sendbuffer

3.上一次缓冲区写满,缓冲区不可写;当对端读取了数据,缓冲区重新可写时,自动触发EPOLLOUT,epoll_wait执行NetThread::processNet,使用writev发剩余的包数据

4.如果连接下一个发送包进来时,上一个包片还未发送完毕,则把当前需要发送的包切片,“粘贴”到未发送_sendbuffer中

总结:ET模式下,EPOLLOUT有以下两种触发时机:

1.epoll_ctrl设置event为EPOLLOUT强制触发

2.上一次发送缓冲区写满时,等待发送缓冲区重新可写时,EPOLLOUT自动触发

PS: LT模式下,EPOLLOUT相关问题

一道腾讯后台开发的面试题(refer: http://kimi.it/515.html) 使用Linuxepoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?

第一种最普遍的方式: 需要向socket写数据的时候才把socket加入epoll,等待可写事件。 接受到可写事件后,调用write或者send发送数据。 当所有数据都写完后,把socket移出epoll。

这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

一种改进的方式: 开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。

这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。