Tomcat NIO(7)-Poller

时间:2022-07-24
本文章向大家介绍Tomcat NIO(7)-Poller,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

上一篇文章里我们主要介绍了 tomcat NIO 中的 acceptor 线程,其中包括了server 监听 socket 的初始化,端口绑定,acceptor 线程的启动,接受连接请求,将请求事件注册到 poller 线程。在这里我们主要介绍 poller 线程。

tomcat NIO 架构中会有 poller 线程,每一个 poller 实例都有一个 NIO selector对象,主要用于监测注册在原始 scoket 上的事件是否发生。每一个 poller 实例同时也是会拥有一个事件队列实例 SynchronizedQueue<PollerEvent>,用来存放发生的事件,一般该事件队列的 items 由前一篇文章介绍的 acceport 线程放入。对于 poller thread,主要包括如下 items:

  • 启动 poller 线程
  • 添加事件到事件队列
  • 对原始 socket 注册事件
  • poller 线程核心逻辑
  • poller 线程的等待与唤醒

启动poller线程:

poller 线程的启动主要在以前文章中介绍的架构类 NioEndpoint 的 startInternal 方法中完成,相关核心源代码如下:

public void startInternal() throws Exception {
      //here we ignore other code section which not related with poller in this method to save space
      // Start poller thread
      poller = new Poller();
      Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
      pollerThread.setPriority(threadPriority);
      pollerThread.setDaemon(true);
      pollerThread.start();
}
  • 本代码基于 tomcat v 9.0.21,发现其中 poller thread 数目始终为 1。
  • 在tomcat 8 及以前的版本中,可以通过 pollerThreadCount 配置 poller thread 的数目。

添加事件到事件队列

该工作主要由 Poller 类的 register 方法和 addEvent 方法完成,核心源码如下:

private Selector selector
private AtomicLong wakeupCounter = new AtomicLong(0);
private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();

public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
      socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
      PollerEvent r = null;
      if (eventCache != null) {
          r = eventCache.pop();
      }
      if (r == null) {
          r = new PollerEvent(socket, OP_REGISTER);
      } else {
          r.reset(socket, OP_REGISTER);
      }
      addEvent(r);
  }

private void addEvent(PollerEvent event) {
     events.offer(event);
      if (wakeupCounter.incrementAndGet() == 0) {
           selector.wakeup();
      }
}
  • Poller 类有 SynchronizedQueue<PollerEvent> 队列,用来存放发生的事件。
  • register() 方法一般由以前文章介绍的 acceptor 线程调用,对原始 socket 注册 open_read 事件,然后创建 OP_REGISTER 类型的 poller 事件,交由方法 addEvent() 处理。
  • addEvent() 方法直接调用队列对象 offer() 方法,将 poller 事件加入队列之中。同时根据 wakeupCounter 增加之后的值是否为 0 来决定是否唤醒 selector(详细会在 poller 线程的等待与唤醒部分讲解)。

对原始socket注册事件

该工作主要由 Poller 对象的 events() 方法以及 PollerEvent 对象的 run() 方法完成,核心源码如下:

//Poller
public boolean events() {
     boolean result = false;
     PollerEvent pe = null;
     for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
         result = true;
         try {
             pe.run();
             pe.reset();
             if (running && !paused && eventCache != null) {
                 eventCache.push(pe);
             }
         } catch ( Throwable x ) {
             log.error(sm.getString("endpoint.nio.pollerEventError"), x);
         }
     }
     return result;
}
//PollerEvent 
public void run() {
    if (interestOps == OP_REGISTER) {
        try {
            socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    } else {
        final SelectionKey key = socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
        try {
            if (key == null) {
                try {
                    socket.socketWrapper.close();
                } catch (Exception ignore) {
                }
            } else {
                final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                if (socketWrapper != null) {
                    int ops = key.interestOps() | interestOps;
                    socketWrapper.interestOps(ops);
                    key.interestOps(ops);
                } else {
                    socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
                }
            }
        } catch (CancelledKeyException ckx) {
            try {
                socket.getSocketWrapper().getPoller().cancelledKey(key, socket.getSocketWrapper());
            } catch (Exception ignore) {}
        }
    }
}
  • Poller 对象实例的 event() 方法会遍历实例中 SynchronizedQueue<PollerEvent> 事件队列中的所有 PollerEvent 事件对象,然后依次调用 PollerEvent 的 run() 方法。同时 reset 事件,以方便后续重用事件对象。
  • PollerEvent 的 run() 方法会根据不同的 event 类型对原始 socket 去注册事件,当 event 为 OP_REGISTER 时,会利用 java NIO API 对原始 socket 去注册 OP_READ 事件,如果不是 OP_REGISTER 则追加当前的 event 事件。根据以前文章,在 acceptor 线程中建立连接之后注册到 Poller 线程中的事件就是OP_REGISTER , 所以是 acceptor 线程建立原始 scoket 连接,然后 Poller 线程对原始 socket 注册 OP_READ 事件。

Poller线程核心逻辑

该工作主要由 Poller 类的 run() 方法和 processKey() 方法以及之前文章介绍的AbstractEndpoint 的 processSocket() 方法完成,核心源码如下:

public void run() {
    while (true) {
        boolean hasEvents = false;
        try {
            if (!close) {
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
            continue;
        }
        if (keyCount == 0) {
            hasEvents = (hasEvents | events());
        }
        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            if (socketWrapper == null) {
                iterator.remove();
            } else {
                iterator.remove();
                processKey(sk, socketWrapper);
            }
        }
        timeout(keyCount,hasEvents);
    }
    getStopLatch().countDown();
}
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
    try {
        if (close) {
            cancelledKey(sk, socketWrapper);
        } else if (sk.isValid() && socketWrapper != null) {
            if (sk.isReadable() || sk.isWritable()) {
                if (socketWrapper.getSendfileData() != null) {
                    processSendfile(sk, socketWrapper, false);
                } else {
                    unreg(sk, socketWrapper, sk.readyOps());
                    boolean closeSocket = false;
                    if (sk.isReadable()) {
                        if (socketWrapper.readOperation != null) {
                            if (!socketWrapper.readOperation.process()) {
                                closeSocket = true;
                            }
                        } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    if (!closeSocket && sk.isWritable()) {
                        if (socketWrapper.writeOperation != null) {
                            if (!socketWrapper.writeOperation.process()) {
                                closeSocket = true;
                            }
                        } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk, socketWrapper);
                    }
                }
            }
        } else {
            cancelledKey(sk, socketWrapper);
        }
    } catch (CancelledKeyException ckx) {
        cancelledKey(sk, socketWrapper);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
    }
}
//AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = null;
        if (processorCache != null) {
            sc = processorCache.pop();
        }
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}
  • run() 方法是一个循环来处理整个 poller 线程的逻辑。
  • run() 方法会先调用前面介绍的 event() 方法来对队列中所有原始 socket 去注册事件。
  • 然后再调用 NIO API selector.selectNow() 和 selector.select() 方法来监听是否有 event() 方法注册的事件发生(这里会涉及到 Poller 线程的阻塞与唤醒这种比较优雅的设计,详细在后面文章单独讲解)。
  • 如果整个 poller 是关闭了,则做超时处理(关闭原始 socket),关闭 selector,跳出整个循环。
  • 如果 selector 有检测到事件发生,则对所有发生的事件调用 poller 的 processKey() 方法。
  • 在每次循环的最后调用 poller 的 timeout() 方法处理是否超时。
  • 对于 poller 实例的 processKey() 方法会去检查 SelectionKey 的合理性,在可读可写的基础上去调用 AbstractEndpoint 的 processSocket() 方法。
  • 对于 AbstractEndpoint 具体实现类对象之中的 processSocket() 方法,会利用 tomcat 的 io 线程池(通过调用getExecutor方法得到线程池)来运行 SocketProcessor对象实例的 run() 方法。也就是说把SocketWrapper 对象实例委托给 SocketProcessor实例的 run() 方法,在 tomcat io 线程池中运行。

目前先写到这里,下一篇文章里我们继续介绍 tomcat NIO 中 poller 线程的阻塞与唤醒。