Tomcat NIO(6)-Acceptor

时间:2022-07-23
本文章向大家介绍Tomcat NIO(6)-Acceptor,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

上一篇文章里我们主要介绍了 tomcat NIO 的整体架构,以及在这个架构下的各个线程,在这里我们主要介绍 acceptor 线程。

在tomcat NIO 中会有一个独立的 acceptor 线程,主要负责接监听端口,接受连接请求,并将请求事件注册到 poller 线程中,主要包括如下:

  • 初始化 server socket,并绑定监听端口
  • 启动 acceptor 线程
  • 接受连接请求
  • 将请求事件注册到 poller 线程

初始化server socket绑定监听端口

该工作主要在以前文章中介绍的架构类 NioEndpoint 的 initServerSocket 方法中完成,相关源代码如下:

protected void initServerSocket() throws Exception {
      if (!getUseInheritedChannel()) {
          serverSock = ServerSocketChannel.open();
          socketProperties.setProperties(serverSock.socket());
          InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
          serverSock.socket().bind(addr,getAcceptCount());
      } else {
          // Retrieve the channel provided by the OS
          Channel ic = System.inheritedChannel();
          if (ic instanceof ServerSocketChannel) {
              serverSock = (ServerSocketChannel) ic;
          }
          if (serverSock == null) {
              throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
          }
      }
        serverSock.configureBlocking(true); //mimic APR behavior
 }
 
private boolean useInheritedChannel = false;

public void setUseInheritedChannel(boolean useInheritedChannel) {
    this.useInheritedChannel = useInheritedChannel; 
}
public boolean getUseInheritedChannel() {
    return useInheritedChannel; 
}
  • getUseInheritedChannel() 方法默认为 false ,所以会走 if 分支。
  • 在上述的 if 分支里, 会利用 java NIO 对象 ServerSocketChannel 创建 server socket ,绑定监听地址和端口,设置 socket 的 backlog 以及其他属性。
  • 最后调用 serverSock.configureBlocking(true) 来设置监听的 socket 为阻塞 scoket ,即该 socket 在 accept 的时候,如果没有连接就使当前线程阻塞。

启动acceptor线程

该工作的完成主要是在以前文章中介绍的架构类 AbstractEndpoint 的 startAcceptorThread 方法中进行,相关源代码如下:

protected void startAcceptorThread() {
    acceptor = new Acceptor<>(this);
    String threadName = getName() + "-Acceptor";
    acceptor.setThreadName(threadName);
    Thread t = new Thread(acceptor, threadName);
    t.setPriority(getAcceptorThreadPriority());
    t.setDaemon(getDaemon());
    t.start();
 }
  • acceptor 线程的核心工作类为 Acceptor。
  • 该方法设置 acceptor 线程的优先级和守护线程属性,并启动线程。

接受连接请求

该工作主要在上面介绍的 Acceptor 类的 run 方法中完成,相关核心源代码如下:

try {
    socket = endpoint.serverSocketAccept();
    } catch (Exception ioe) {
        endpoint.countDownConnection();
        if (endpoint.isRunning()) {
            errorDelay = handleExceptionWithDelay(errorDelay);
            throw ioe;
         } else {
              break;
         }
    }

errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
    // setSocketOptions() will hand the socket off to
    // an appropriate processor if successful
        if (!endpoint.setSocketOptions(socket)) {
            endpoint.closeSocket(socket);
        }
} else {
        endpoint.destroySocket(socket);
}
  • 调用 endpoint.serverSocketAccept() 方法来获取已经完成 tcp 三次握手的socket 连接,如果没有发生异常,并且 endpoint 正在 running 状态,同时也没有暂停,那么该逻辑就会把针对新进入连接所创建的原始 java socket 对象交由 endpoint.setSocketOptions(socket) 方法去处理。
  • 根据上面介绍的 socket 初始化过程,server 端的监听 socket 是被设置为阻塞 socket ,所以endpoint.serverSocketAccept() 方法在没有可用连接的时候 acceptor 线程是阻塞的。

将请求事件注册到poller线程

该工作主要在以前文章中介绍的 NioEndpoint 的 setSocketOptions 方法和 Poller类中的 register 方法中完成,相关源代码如下:

//NioEndpoint
protected boolean setSocketOptions(SocketChannel socket) {
    try {
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        NioChannel channel = null;
        if (nioChannels != null) {
            channel = nioChannels.pop();
        }
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        NioSocketWrapper socketWrapper = new NioSocketWrapper(channel, this);
        channel.setSocketWrapper(socketWrapper);
        socketWrapper.setReadTimeout(getConnectionTimeout());
        socketWrapper.setWriteTimeout(getConnectionTimeout());
        socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        socketWrapper.setSecure(isSSLEnabled());
        poller.register(channel, socketWrapper);
        return true;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error(sm.getString("endpoint.socketOptionsError"), t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
    }
    return false;
}

//Poller
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
      socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
      PollerEvent r = null;
      if (eventCache != null) {
          r = eventCache.pop();
      }
      if (r == null) {
          r = new PollerEvent(socket, OP_REGISTER);
      } else {
          r.reset(socket, OP_REGISTER);
      }
      addEvent(r);
  }
  • 上述逻辑会在 setSocketOptions() 方法里进行构造 SocketBufferHandler 对象,主要设定读写 buffer 大小,以及是否使用 DirectBuffer (默认使用)。
  • 构造以前文章中介绍的 NioChannel 对象,该对象封装了基于原始 socket 去进行构造的 SocketBufferHandler 对象。
  • 构造以前文章中介绍的 NioSocketWrapper 对象,该对象封装了上面构造的 NioChannel 对象和当前 NioEndpoint 对象。
  • 调用以前文章介绍的 poller 的 register(channel, socketWrapper) 方法将上面构造的 NioChannel 对象和 NioSocketWrapper 对象注册到 poller 线程的事件队列里。
  • 由poller 对象的 register() 方法分析可知,会注册 OP_REGISTER 类型的PollerEvent 到 poller 对象的事件队列里。

目前先写到这里,下一篇文章里我们继续介绍 tomcat NIO 中的 poller 线程。