lettuce和hbase中对netty的使用你都了解吗?

时间:2022-07-22
本文章向大家介绍lettuce和hbase中对netty的使用你都了解吗?,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

之前的两篇文章中咱们有详细地聊过关于lettuce的pipeline以及spring-data-redis对其封装的细节。本篇紧接着上面篇以connectionPoolingProvider为入口,对lettuce基于netty处理IO事件的线程池进行进一步地分析。

lettuce EventLoopGroup初始化

在使用lettuce作为redis连接池时,在上一节中我们知道,lettuce中维护连接有两种使用连接池的方式,目前一种已经废弃,另一种大家正在使用的版本是apache commons pool。咱们来回顾下。

从连接池获取连接的入口

org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:

@Override
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
            // 不存在就创建连接池
            return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
                    poolConfig, false);
        });
        try {
            // 从连接池中获取
            StatefulConnection<?, ?> connection = pool.borrowObject();
            // 放入map中
            poolRef.put(connection, pool);
            return connectionType.cast(connection);
        } catch (Exception e) {
            throw new PoolException("Could not get a resource from the pool", e);
        }
    }

上面的代码中获取连接的核心是connectionProvider.getConnection(connectionType),我们先来看一下connectionProvider的分类列表如下:

LettucePoolingConnectionProvider只是一个代理,在它的内部维护着具体的创建连接的provider。在我的场景下的这个provider是StandaloneConnectionProvider。

我们来看一下org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider#getConnection(java.lang.Class)方法:

@Override
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        ------------省略部分--------------
        if (StatefulConnection.class.isAssignableFrom(connectionType)) {
            return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
                    .orElseGet(() -> client.connect(codec)));
        }
        -------------------------
    }

在这里是使用RedisClient获取连接的方法,接着一起来看下io.lettuce.core.RedisClient#connect(io.lettuce.core.codec.RedisCodec<K,V>)方法:

 public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
        checkForRedisURI();
        return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
    }

在这里connectStandaloneAsync方法返回的是一个ConnectionFuture<StatefulRedisConnection<K, V>>对象,是一个维户着我们的redis连接的Future。来看下io.lettuce.core.RedisClient#connectStandaloneAsync方法:

private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
            RedisURI redisURI, Duration timeout) {
        -------------------
        // 创建DefaultEndpoint对象
        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
        RedisChannelWriter writer = endpoint;
        if (CommandExpiryWriter.isSupported(clientOptions)) {
            writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
        }
        // 创建连接对象
        StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
       // 进行连接
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
                () -> new CommandHandler(clientOptions, clientResources, endpoint));
         ------------------------------------------
        return future;
    }

这里我们来分析下真正去建立连接的过程,主要逻辑在io.lettuce.core.RedisClient#connectStatefulAsync方法中,代码如下:

private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            RedisCodec<K, V> codec, Endpoint endpoint,
            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder;
        if (redisURI.isSsl()) {
       -------------省略部分代码----------------
        } else {
            // 创建connection builder对象
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        connectionBuilder.connection(connection);
        connectionBuilder.clientOptions(clientOptions);
        connectionBuilder.clientResources(clientResources);
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
        // 在这里会创建netty client Bootstrap对象
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        // 在这里根据channel type来初始化eventLoopGroup对象
        channelType(connectionBuilder, redisURI);
       ----------------------省略部分代码--------------------
        return sync.thenApply(channelHandler -> (S) connection);
    }

这里主要需要分析个方法:connectionBuilder方法和channelType方法。

1.io.lettuce.core.AbstractRedisClient#connectionBuilder方法:

     protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
               RedisURI redisURI) {
           // 创建bootstrap对象
           Bootstrap redisBootstrap = new Bootstrap();
           // 设置高水位线
           redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
          // 设置低水平线
           redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
          // buf的allocator
           redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
           SocketOptions socketOptions = getOptions().getSocketOptions();
          // 设置bootstrap连接超时时间
           redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                   Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
           if (LettuceStrings.isEmpty(redisURI.getSocket())) {
               redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
               redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
           }
            // 设置超时
           connectionBuilder.timeout(redisURI.getTimeout());
           // 设置密码
           connectionBuilder.password(redisURI.getPassword());
           connectionBuilder.bootstrap(redisBootstrap);
           connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
           connectionBuilder.socketAddressSupplier(socketAddressSupplier);
       }

这里主要是初始化 netty client的相关对象,并对其中一些参数进行设置。可以看到整个connectionBuilder对象中维护着连接所需要的所有信息。

1.io.lettuce.core.AbstractRedisClient#channelType方法

    protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {
           LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
           connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));
           if (connectionPoint.getSocket() != null) {
               NativeTransports.assertAvailable();
               connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
           } else {
               connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
           }
       }

这个方法的目的也很简单,主要是设置执行netty 网络IO操作的线程池 eventLoopGroup。在真正开始分析eventLoopGroup之前,先来看一下io.lettuce.core.Transports#eventLoopGroupClass方法:

     static Class<? extends EventLoopGroup> eventLoopGroupClass() {
           if (NativeTransports.isSocketSupported()) {
               return NativeTransports.eventLoopGroupClass();
           }
           return NioEventLoopGroup.class;
       }

这个方法的主要目的是确定使用哪种类型的NioEventLoopGroup。在io.lettuce.core.Transports.NativeTransports#eventLoopGroupClass方法内部:

   static Class<? extends EventLoopGroup> eventLoopGroupClass() {
       if (KqueueProvider.isAvailable()) {
           return KqueueProvider.eventLoopGroupClass();
       }
       return EpollProvider.eventLoopGroupClass();
   }

这个方法会先判断kqueue与epoll provider的可用性,然后来决定eventLoopGroup的class类型是io.netty.channel.kqueue.KQueueEventLoopGroup还是io.netty.channel.epoll.EpollEventLoopGroup。具体判断的逻辑在EpollProvider和KqueueProvider的静态代码块,有兴趣的可以自己去分析一下,这里简单地看下kqueue的:

如果既没有epoll也没有kqueue,那么会使用NioEventLoopGroup。

获取EventLoopGroup

此时再回过头来看下io.lettuce.core.AbstractRedisClient#getEventLoopGroup方法代码如下:

      private synchronized EventLoopGroup getEventLoopGroup(ConnectionPoint connectionPoint) {
           if (connectionPoint.getSocket() == null && !eventLoopGroups.containsKey(Transports.eventLoopGroupClass())) {
              // 先进行对应eventLoopGroupClass的初始化
               eventLoopGroups.put(Transports.eventLoopGroupClass(),clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass()));
           }
           if (connectionPoint.getSocket() != null) {
               NativeTransports.assertAvailable();
               Class<? extends EventLoopGroup> eventLoopGroupClass = NativeTransports.eventLoopGroupClass();
               if (!eventLoopGroups.containsKey(NativeTransports.eventLoopGroupClass())) {
                   eventLoopGroups
                           .put(eventLoopGroupClass, clientResources.eventLoopGroupProvider().allocate(eventLoopGroupClass));
               }
           }
           if (connectionPoint.getSocket() == null) {
               // 返回
               return eventLoopGroups.get(Transports.eventLoopGroupClass());
           }
           if (connectionPoint.getSocket() != null) {
               NativeTransports.assertAvailable();
               return eventLoopGroups.get(NativeTransports.eventLoopGroupClass());
           }
           throw new IllegalStateException("This should not have happened in a binary decision. Please file a bug.");
       }

这里我们主要关注clientResources.eventLoopGroupProvider().allocate(Transports.eventLoopGroupClass())方法,其中clientResources.eventLoopGroupProvider()方法会从clientResources中获取到eventLoopGroupProvider,然后使用这个provider去申请对应class类型的eventLoopGroup。

在DefaultClientResources中的provider类型是DefaultEventLoopGroupProvider,至于为什么这样,下文会有分析。我们来看下io.lettuce.core.resource.DefaultEventLoopGroupProvider#allocate方法:

  @Override
    public <T extends EventLoopGroup> T allocate(Class<T> type) {
        synchronized (this) {
            logger.debug("Allocating executor {}", type.getName());
            // 将引用放入到外部容器中去,方便计数统计
            return addReference(getOrCreate(type));
        }
    }

io.lettuce.core.resource.DefaultEventLoopGroupProvider#getOrCreate:
private <T extends EventLoopGroup> T getOrCreate(Class<T> type) {
 ---------------省略部分代码------------------
        if (!eventLoopGroups.containsKey(type)) {
            // 如果不存在,则创建eventLoopGroup
            eventLoopGroups.put(type, createEventLoopGroup(type, numberOfThreads));
        }
        return (T) eventLoopGroups.get(type);
    }

io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup:
 public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
        logger.debug("Creating executor {}", type.getName());
        if (DefaultEventExecutorGroup.class.equals(type)) {
            return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
        }
        if (NioEventLoopGroup.class.equals(type)) {
            return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
        }
        if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
            return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
        }
        if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
            return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
        }
        throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
    }

这里的getEventLoopGroup方法会生成真正供netty使用的EventLoopGroup,可能是nio、epoll、kqueue中的一种。先判断下当前环境是否支持epoll和kequeue,如果支持则会生成对应的eventLoopGroup,默认使用的是NioEventLoopGroup。没错,这个eventLoopGroup就是netty client进行io操作的eventLoopGroup。

DefaultClientResources 的 DefaultEventExecutorGroup

这里先来看一下clientResources的初始化,org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#lettuceClientResources代码:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(ClientResources.class)
public DefaultClientResources lettuceClientResources() {
   return DefaultClientResources.create();
}

io.lettuce.core.resource.DefaultClientResources#create:
  public static DefaultClientResources create() {
        return builder().build();
    }

io.lettuce.core.resource.DefaultClientResources.Builder#build:
@Override
public DefaultClientResources build() {
    return new DefaultClientResources(this);
}

protected DefaultClientResources(Builder builder) {
        if (builder.eventLoopGroupProvider == null) {
            int ioThreadPoolSize = builder.ioThreadPoolSize;
            ----------省略部分代码---------
                // 创建DefaultEventLoopGroupProvider
            this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);

        } else {
            this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
            this.eventLoopGroupProvider = builder.eventLoopGroupProvider;
        }

        if (builder.eventExecutorGroup == null) {
          ------------------------------
              // 创建eventExecutorGroup
            eventExecutorGroup = DefaultEventLoopGroupProvider.createEventLoopGroup(DefaultEventExecutorGroup.class,
                    computationThreadPoolSize);
            sharedEventExecutor = false;
        } else {
            sharedEventExecutor = builder.sharedEventExecutor;
            eventExecutorGroup = builder.eventExecutorGroup;
        }

        if (builder.timer == null) {
            // 创建时间轮timer
            timer = new HashedWheelTimer(new DefaultThreadFactory("lettuce-timer"));
            sharedTimer = false;
        } else {
            timer = builder.timer;
            sharedTimer = builder.sharedTimer;
        }
        if (builder.eventBus == null) {
            // 创建事件驱动服务总线
            eventBus = new DefaultEventBus(Schedulers.fromExecutor(eventExecutorGroup));
        } else {
            eventBus = builder.eventBus;
        }
-------------------------------------

这里本文主要关注的重点是创建eventExecutorGroup的方法,eventExecutorGroup是EventExecutorGroup类型的,它的主要作用是作为netty channelGroup的EventExecutor。关于EventExecutor参考

•https://www.cnblogs.com/lighten/p/8967630.html•https://juejin.im/post/5de7a0dcf265da33bf1ff42c

io.lettuce.core.resource.DefaultEventLoopGroupProvider#createEventLoopGroup方法代码如下:

public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopGroup(Class<T> type, int numberOfThreads) {
        logger.debug("Creating executor {}", type.getName());
        if (DefaultEventExecutorGroup.class.equals(type)) {
            // 创建默认的eventExecutorGroup
            return new DefaultEventExecutorGroup(numberOfThreads, new DefaultThreadFactory("lettuce-eventExecutorLoop", true));
        }
        if (NioEventLoopGroup.class.equals(type)) {
            // 如果是NioEventLoopGroup类型的会创建NioEventLoopGroup
            return new NioEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-nioEventLoop", true));
        }
        if (EpollProvider.isAvailable() && EpollProvider.isEventLoopGroup(type)) {
            // 创建epoll类型的eventLoop
            return EpollProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-epollEventLoop", true));
        }
        if (KqueueProvider.isAvailable() && KqueueProvider.isEventLoopGroup(type)) {
            // 创建kqueue类型的eventLoop
            return KqueueProvider.newEventLoopGroup(numberOfThreads, new DefaultThreadFactory("lettuce-kqueueEventLoop", true));
        }
        throw new IllegalArgumentException(String.format("Type %s not supported", type.getName()));
    }

这个方法的主要作用是根据类型创建相应类型的EventLoopGroup,对于DefaultClientResources来说,它创建的是DefaultEventExecutorGroup,属于EventExecutorGroup类型的,它的主要作用是处理一些回调事件,异步处理,超时处理等,但是它不是用于处理netty io事件的那个eventLoopGroup。

说在后面的话

hbase client中底层rpc也是使用的也是Netty,它的org.apache.hadoop.hbase.ipc.NettyRpcConnection#connect方法部分截图如下:

这里也会涉及到一个rpcClient.group的初始化过程,它和lettuce中对netty的使用有什么区别呢?感兴趣的可以去分析一下。