muduo TcpServer粗略过程

时间:2022-07-26
本文章向大家介绍muduo TcpServer粗略过程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

使用 muduoexamplesasiochatserver.cc 作例子

首先需要知道:

EventLoop:一个事件分发器类,拥有 Poller 对象,事件处理函数是 loop(),在这里捕获注册的 Channel 事件,并调用相应的回调函数。

while (!quit_)
  {
    ...
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    ...
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      // 通过 revents_ 来处理响应的事件
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    ...
  }

下面这几个类,都拥有一个 EventLoop * 对象,他们的事件注册,就注册到他们分配到的 EventLoop * 对象中。

Channel:一个封装 fd 的类,方便向 EventLoop 注册事件.比如,可读,可写事件。

Acceptor:负责接受连接的类,拥有一个Channel对象,向 EventLoop 注册为可读,则当有人来连接的时候,就能调用相应的回调。

TcpConnecton:也拥有一个 Channel 对象,已建立连接以后,再生成的对象,用来处理,连接之后的操作,发送,接受,断开。

然后开始来看吧,客户使用的代码,极其简单,就建立起一个 ChatServer . 这里的 loop 为 baseloop 处于主线程的条件下。

    ChatServer server(&loop, serverAddr);
    server.start();
    loop.loop();

再来看看 server.start() 干了什么.先在构造函数中,初始化了几个成员,threadPool_,acceptor_。设置了acceptor_的回调.

1.threadPool_.start(),开启创建 N 个 EventLoopThread 个对象,即开了了 N 个 EventLoop 对象,并执行他们的 loop() 函数。

2.loop_runInLoop(..listen..),runInLoop 简单说明一下,就是保证你的函数执行的环境是跟你的 EventLoop 处于同一个线程中,保证同步。 最终,这个函数会调用 acceptor_.listen(),这个函数的本质就是向他的 loop 注册可读事件,可读的话就是有人来连接啦。可以自己跟踪下。

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    hostport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
    // Acceptor::handleRead函数中会回调用TcpServer::newConnection
    // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::start()
{
    ....
    threadPool_->start(threadInitCallback_);
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
    ....
}

到这里为止,就已经注册了 accept fd 的可读事件,那么当有连接到来的时候,就会触发,在 loop() 函数中,然后去调用响应的回调函数,在 TcpServer 构造函数中可以看到,注册了 acceptor_ 可读回调是 TcpServer::newConnecton() ,所以,就来看看这个函数又看了什么。

1.可以看到,通过传过来的参数,创建一个 TcpConnecton 对象,并存储到 connections_ 中,这是一个 map 容器。threadPool_->getNextLoop() 就是在线程池中分配一个 loop 给他,负载均衡吧。如果之前设置的线程数为0,则大家共用主线程的 loop.

2.注册,响应的回调函数,可读,可写,有连接(就是调用用户设置的有连接来的回调函数),关闭。

3.最后调用,ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));这个函数里面,实际就是将 TcpConnection 中的 Channel 注册为可读,这样就能开始接收 client 的数据。然后也可以调用 TcpConnection 的 send 来发数据。

这里的接受和发送设计一个 Buffer 类,这是一个 缓存类,非阻塞I/O必备,这里就不说先。

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
// 按照轮叫的方式选择一个EventLoop
  EventLoop* ioLoop = threadPool_->getNextLoop();
  
  // 记录那个 nextConnId_ 有什么意义吗? 
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
           
  // getLocalAddr listenAddr 不应该就是 localAddr 了吗?
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  // 每个TcpConnection 里面都有一个 Channel
  // 每一个 channel 里面 有一个 loop 
  // 每一个 loop 里面有一个 poller_(),每一个loop不同线程
  // 每一个 loop 里面有一个 vector<Channel *>
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  // connections_ is a map<string,TcpConnectionPtr>
  connections_[connName] = conn;
  
  conn->setConnectionCallback(connectionCallback_);//改变connection 改为用户定义的,跟构造函数的不一样
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe//跨线程的调用
// TcpConnection所对应的通道加入到Poller关注

  // 嗯,这个时候这里调用 ioLoop->runInLoop() 的话,似乎就不是他那里的环境
  // 没错,这里的 ioloop 是在其他线程中创建的。
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//各自线程调用
}

当已经建立连接好以后,我们再来看看,在最初的 loop() 函数中的 Channel::handleEvent() 函数.可以看到,通过判断事件来种类,来调用响应的回调函数。

   if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }

大致逻辑是这样,服务端的过程比客户端的过程要复杂一些,毕竟服务端要管理这么多个连接。客户端的过程大家可以自己去看一下。本文只是粗略的解释了服务端的过程,其中涉及的多线程应该注意的事项还无功力能总结。