muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类

时间:2022-04-21
本文章向大家介绍muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1、EventLoopThread(IO线程类)

任何一个线程,只要创建并运行了EventLoop,都称之为IO线程

IO线程不一定是主线程

muduo并发模型one loop per thread + threadpool(计算线程池)

为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程

EventLoopThread创建了一个线程 在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop

多个IO线程可以用IO线程池来管理,对应的类是EventLoopThreadPool 

 C++ Code 
 class EventLoopThread : boost::noncopyable
{
public:
    typedef boost::function<void(EventLoop *)> ThreadInitCallback;

    EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback());
    ~EventLoopThread();

    EventLoop *startLoop()// 启动线程,该线程就成为了IO线程
    {
        thread_.start(); // 执行threadFunc(); 构造函数初始化列表中thread_(boost::bind(&EventLoopThread::threadFunc, this))
        ....
    };

private:
    void threadFunc();      // 线程函数

    EventLoop *loop_;           // loop_指针指向一个EventLoop对象
    bool exiting_;
    Thread thread_;
    MutexLock mutex_;
    Condition cond_;
    ThreadInitCallback callback_;       // 回调函数在EventLoop::loop事件循环之前被调用
};

EventLoopThread::~EventLoopThread()
{
    exiting_ = true;
    loop_->quit();      // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
    thread_.join();
}

 C++ Code 
 	
EventLoop *EventLoopThread::startLoop()
{
    assert(!thread_.started());
    thread_.start(); //启动线程,此时有两个线程在运行,
    //一个是调用EventLoopThread::startLoop()的线程,一个是执行EventLoopThread::threadFunc()的线程(IO线程)
    {
        MutexLockGuard lock(mutex_);
        while (loop_ == NULL)
        {
            cond_.wait(); //函数返回loop_,所以要等待IO线程启动
        }
    }

    return loop_;
}

void EventLoopThread::threadFunc()
{
    EventLoop loop;

    if (callback_)
    {
        callback_(&loop);
    }

    {
        MutexLockGuard lock(mutex_);
        // 一般情况是EventLoopThread对象先析构,析构函数调用loop_->quit() 使得loop.loop() 退出循环
        // 这样threadFunc 退出,loop栈上对象析构,loop_ 指针失效,但此时已经不会再通过loop_ 访问loop,
        // 故不会有问题。
        loop_ = &loop;
        cond_.notify();
    }

    loop.loop();
    //assert(exiting_);
}

测试代码:

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
    printf("runInThread(): pid = %d, tid = %dn",
           getpid(), CurrentThread::tid());
}

int main()
{
    printf("main(): pid = %d, tid = %dn",
           getpid(), CurrentThread::tid());

    EventLoopThread loopThread;
    EventLoop *loop = loopThread.startLoop();
    // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
    loop->runInLoop(runInThread);
    sleep(1);
    // runAfter内部也调用了runInLoop,所以这里也是异步调用,让该IO线程添加一个2s定时器
    loop->runAfter(2, runInThread);
    sleep(3);
    //~EventLoopThread()会调用loop_->quit();

    printf("exit main().n");
}

执行输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test06 20131108 03:29:12.749530Z  2628 TRACE IgnoreSigPipe Ignore SIGPIPE - EventLoop.cc:51 main(): pid = 2628, tid = 2628 20131108 03:29:12.753135Z  2629 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 03:29:12.753794Z  2629 TRACE EventLoop EventLoop created 0xB7415F44 in thread 2629 - EventLoop.cc:76 20131108 03:29:12.754266Z  2629 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104 20131108 03:29:12.754707Z  2629 TRACE loop EventLoop 0xB7415F44 start looping - EventLoop.cc:108 20131108 03:29:12.755088Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:12.756033Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 runInThread(): pid = 2628, tid = 2629 20131108 03:29:13.755730Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:13.756388Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 20131108 03:29:15.755858Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:15.757316Z  2629 TRACE printActiveChannels {4: IN }  - EventLoop.cc:271 20131108 03:29:15.757469Z  2629 TRACE readTimerfd TimerQueue::handleRead() 1 at 1383881355.757345 - TimerQueue.cc:62 runInThread(): pid = 2628, tid = 2629 exit main(). 20131108 03:29:16.755942Z  2629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 03:29:16.755988Z  2629 TRACE printActiveChannels {5: IN }  - EventLoop.cc:271 20131108 03:29:16.756003Z  2629 TRACE loop EventLoop 0xB7415F44 stop looping - EventLoop.cc:133 simba@ubuntu:~/Documents/build/debug/bin$ 

主线程不是IO线程,根据前面的文章,timerfd_ = 4, wakeupFd_ = 5。主线程调用 loop->runInLoop(runInThread); 由于不是IO线

程调用runInLoop, 故调用queueInLoop() 将runInThead 添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors() 中取

出队列的runInThread()执行,可以看到IO线程的tid 跟主线程不一样。同理,loop->runAfter(2, runInThread); 也是一样的流程,需

要唤醒一下,此时只是执行runAfter() 添加了一个2s的定时器, 2s超时,timerfd_ 可读,先handleRead()一下然后执行回调函数

runInThread()。那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是因为EventLoopThead 栈上对象析构,在析构函数内

loop_ ->quit(), 由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll 返回,这样再次循环判断 while (!quit_) 就能

退出IO线程。

2、EventLoopThreadPool(IO线程池类)

IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态

class EventLoopThreadPool : boost::noncopyable
{
public:
    typedef boost::function<void(EventLoop *)> ThreadInitCallback;

    EventLoopThreadPool(EventLoop *baseLoop);
    ~EventLoopThreadPool();
    void setThreadNum(int numThreads)
    {
        numThreads_ = numThreads;
    }
    void start(const ThreadInitCallback &cb = ThreadInitCallback());

    // 如果loops_为空,则loop指向baseLoop_
    // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
    EventLoop *getNextLoop();


private:

    EventLoop *baseLoop_;   // 与Acceptor所属EventLoop相同
    bool started_;
    int numThreads_;        // 线程数,除去mainReactor
    int next_;          // 新连接到来,所选择的EventLoop对象下标
    boost::ptr_vector<EventLoopThread> threads_;        // IO线程列表
    std::vector<EventLoop *> loops_;                    // EventLoop列表
};

void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
    assert(!started_);
    baseLoop_->assertInLoopThread();

    started_ = true;

    for (int i = 0; i < numThreads_; ++i)
    {
        EventLoopThread *t = new EventLoopThread(cb);
        threads_.push_back(t);
        loops_.push_back(t->startLoop());   // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
    }
    if (numThreads_ == 0 && cb)
    {
        // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
        cb(baseLoop_);
    }
}

现在使用 mainReactor + ThreadPool(subReactors) 模式,则baseLoop_ 与TcpServer 和 Acceptor 中的 loop_ 成员是相同的,即mainReactor 处理监听事件,已连接套接字事件轮询给线程池中的subReactors 处理。此时需要注意,创建一个TcpConnection对象时,需要绑定其中一个subReactor, 如下:

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    loop_->assertInLoopThread();
    // 按照轮叫的方式选择一个EventLoop
    EventLoop *ioLoop = threadPool_->getNextLoop();

    InetAddress localAddr(sockets::getLocalAddr(sockfd));

    TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));

    connections_[connName] = conn;

    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));

}

测试程序:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestServer
{
public:
    TestServer(EventLoop *loop,
               const InetAddress &listenAddr, int numThreads)
        : loop_(loop),
          server_(loop, listenAddr, "TestServer"),
          numThreads_(numThreads)
    {
        server_.setConnectionCallback(
            boost::bind(&TestServer::onConnection, this, _1));
        server_.setMessageCallback(
            boost::bind(&TestServer::onMessage, this, _1, _2, _3));
        server_.setThreadNum(numThreads);
    }

    void start()
    {
        server_.start();
    }

private:
    void onConnection(const TcpConnectionPtr &conn)
    {
        if (conn->connected())
        {
            printf("onConnection(): new connection [%s] from %sn",
                   conn->name().c_str(),
                   conn->peerAddress().toIpPort().c_str());
        }
        else
        {
            printf("onConnection(): connection [%s] is downn",
                   conn->name().c_str());
        }
    }

    void onMessage(const TcpConnectionPtr &conn,
                   const char *data,
                   ssize_t len)
    {
        printf("onMessage(): received %zd bytes from connection [%s]n",
               len, conn->name().c_str());
    }

    EventLoop *loop_;
    TcpServer server_;
    int numThreads_;
};


int main()
{
    printf("main(): pid = %dn", getpid());

    InetAddress listenAddr(8888);
    EventLoop loop;

    TestServer server(&loop, listenAddr, 4);
    server.start();

    loop.loop();
}

此时共有5个IO线程,1个主线程(mainReactor)和 4个线程池中的线程(subReactor),server.start() 会启动线程池中的4个线程,并且启动mainReactor 监听:

// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
    if (!started_)
    {
        started_ = true;
        threadPool_->start(threadInitCallback_);
    }

    if (!acceptor_->listenning())
    {
        // get_pointer返回原生指针
        loop_->runInLoop(
            boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
    }
}

开启两个telnet 客户端连接服务器,其中一个输入aaaa, 另一个输入ddddd, 服务器端输出如下:

simba@ubuntu:~/Documents/build/debug/bin$ ./reactor_test10 main(): pid = 8628 20131108 11:33:15.190620Z  8628 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104 20131108 11:33:15.191246Z  8628 TRACE EventLoop EventLoop created 0xBFB77D50 in thread 8628 - EventLoop.cc:62 20131108 11:33:15.191568Z  8628 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104

20131108 11:33:15.192270Z  8629 TRACE updateChannel fd = 9 events = 3 - EPollPoller.cc:104 20131108 11:33:15.192625Z  8629 TRACE EventLoop EventLoop created 0xB7484F44 in thread 8629 - EventLoop.cc:62 20131108 11:33:15.192927Z  8629 TRACE updateChannel fd = 10 events = 3 - EPollPoller.cc:104

20131108 11:33:15.193356Z  8629 TRACE loop EventLoop 0xB7484F44 start looping - EventLoop.cc:94 20131108 11:33:15.193759Z  8630 TRACE updateChannel fd = 12 events = 3 - EPollPoller.cc:104 20131108 11:33:15.194100Z  8630 TRACE EventLoop EventLoop created 0xB6AFEF44 in thread 8630 - EventLoop.cc:62 20131108 11:33:15.194398Z  8630 TRACE updateChannel fd = 13 events = 3 - EPollPoller.cc:104

20131108 11:33:15.194786Z  8630 TRACE loop EventLoop 0xB6AFEF44 start looping - EventLoop.cc:94 20131108 11:33:15.195135Z  8631 TRACE updateChannel fd = 15 events = 3 - EPollPoller.cc:104 20131108 11:33:15.195534Z  8631 TRACE EventLoop EventLoop created 0xB60FEF44 in thread 8631 - EventLoop.cc:62 20131108 11:33:15.207467Z  8631 TRACE updateChannel fd = 16 events = 3 - EPollPoller.cc:104

20131108 11:33:15.208169Z  8631 TRACE loop EventLoop 0xB60FEF44 start looping - EventLoop.cc:94 20131108 11:33:15.208940Z  8632 TRACE updateChannel fd = 18 events = 3 - EPollPoller.cc:104 20131108 11:33:15.209576Z  8632 TRACE EventLoop EventLoop created 0xB58FDF44 in thread 8632 - EventLoop.cc:62 20131108 11:33:15.210087Z  8632 TRACE updateChannel fd = 19 events = 3 - EPollPoller.cc:104

20131108 11:33:15.210445Z  8628 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104 20131108 11:33:15.210750Z  8628 TRACE loop EventLoop 0xBFB77D50 start looping - EventLoop.cc:94 20131108 11:33:15.211122Z  8632 TRACE loop EventLoop 0xB58FDF44 start looping - EventLoop.cc:94 20131108 11:33:18.958878Z  8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959167Z  8628 TRACE printActiveChannels {6: IN}  - EventLoop.cc:257 20131108 11:33:18.959226Z  8628 INFO  TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 - TcpServer.cc:93 20131108 11:33:18.959262Z  8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x8C84F98 fd=20 - TcpConnection.cc:62 20131108 11:33:18.959277Z  8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:18.959300Z  8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:18.959322Z  8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:18.959343Z  8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:18.959365Z  8629 TRACE printActiveChannels {10: IN }  - EventLoop.cc:257 20131108 11:33:18.959378Z  8629 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:18.959409Z  8629 TRACE updateChannel fd = 20 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56411 20131108 11:33:18.959433Z  8629 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:23.111546Z  8628 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111628Z  8628 TRACE printActiveChannels {6: IN }  - EventLoop.cc:257 20131108 11:33:23.111662Z  8628 INFO  TcpServer::newConnection [TestServer] - new connection[TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 - TcpServer.cc:93 20131108 11:33:23.111680Z  8628 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#2] at 0x8C85128 fd=21 - TcpConnection.cc:62 20131108 11:33:23.111693Z  8628 TRACE newConnection [1] usecount=1 - TcpServer.cc:111 20131108 11:33:23.111722Z  8628 TRACE newConnection [2] usecount=2 - TcpServer.cc:113 20131108 11:33:23.111746Z  8628 TRACE newConnection [5] usecount=3 - TcpServer.cc:122 20131108 11:33:23.111769Z  8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:23.111792Z  8630 TRACE printActiveChannels {13: IN}  - EventLoop.cc:257 20131108 11:33:23.111805Z  8630 TRACE connectEstablished [3] usecount=3 - TcpConnection.cc:78 20131108 11:33:23.111813Z  8630 TRACE updateChannel fd = 21 events = 3 - EPollPoller.cc:104 onConnection(): new connection [TestServer:0.0.0.0:8888#2] from 127.0.0.1:56412 20131108 11:33:23.111836Z  8630 TRACE connectEstablished [4] usecount=3 - TcpConnection.cc:83 20131108 11:33:25.219778Z  8631 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:25.219829Z  8632 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:28.969971Z  8629 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.119151Z  8630 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.119202Z  8628 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:33.754975Z  8629 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:33.755031Z  8629 TRACE printActiveChannels {20: IN}  - EventLoop.cc:257 20131108 11:33:33.755042Z  8629 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 6 bytes from connection [TestServer:0.0.0.0:8888#1] 20131108 11:33:33.755128Z  8629 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:35.230224Z  8631 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:35.230274Z  8632 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:36.540663Z  8630 TRACE poll 1 events happended - EPollPoller.cc:65 20131108 11:33:36.540715Z  8630 TRACE printActiveChannels {21: IN}  - EventLoop.cc:257 20131108 11:33:36.540727Z  8630 TRACE handleEvent [6] usecount=2 - Channel.cc:67 onMessage(): received 7 bytes from connection [TestServer:0.0.0.0:8888#2] 20131108 11:33:36.540780Z  8630 TRACE handleEvent [12] usecount=2 - Channel.cc:69 20131108 11:33:43.129769Z  8628 TRACE poll  nothing happended - EPollPoller.cc:74 20131108 11:33:43.765633Z  8629 TRACE poll  nothing happended - EPollPoller.cc:74

一个进程本来被打开的文件描述符就有0,1,2;

每个Reactor 的 EventLoop 对象构造时,默认使用的是EPollPoller,即EPollPoller::epollfd_ ;

此外还有两个channel(EventLoop::timeQueue_ ::timerfd_ 和 EventLoop::wakeupFd_ )

处于被poll()关注可读事件的状态,而且是一直关注直到事件循环结束。

即每个Reactor 都分别有这3个fd;

对于mainReactor来说,还有Acceptor::acceptSocket_.sockfd_ (listenfd);  Acceptor::idleFd_ ; (/dev/null)

按上述程序来说,mainReactor中:epollfd_ = 3; timerfd_ = 4; wakeupFd_ = 5; sockfd_ = 6; idleFd_ = 7;

(8,9,10),(11,12,13),(14,15,16),(17,18,19) 分别归4个IO线程所有

这样已连接套接字只能从20开始,而且均匀分配到4个subReactor 处理事件(可读事件(包括接收数据,连接关闭),可写事件(内核发送缓冲区不为满),错误事件)

当第一个客户端连接上来时,sockfd_ 可读,mainReactor 调用 TcpServer::newConnection(), 创建一个TcpConnection对象,绑定到线程池中的第一个IO线程上,函数内调用ioLoop->runInLoop(); 会唤醒第一个IO线程,即第一个IO线程的wakeupFd_ (10)可读,handleEvent() 处理后继续处理doPendingFunctors(),执行TcpConnection::connectEstablished(),接下去的流程包括接收数据(fd = 20 可读)可以参考EventLoop(三)的描述。

当然如果我们传递的numThreads_ = 0 或者不传递; 即只有一个mainReactor, 则监听套接字和已连接套接字事件都要由这个mainReactor处理。

参考:

《UNP》

muduo manual.pdf

《linux 多线程服务器编程:使用muduo c++网络库》