Redis集群环境下的键值空间监听事件实现方案

时间:2019-08-22
本文章向大家介绍Redis集群环境下的键值空间监听事件实现方案,主要包括Redis集群环境下的键值空间监听事件实现方案使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一直想记录工作中遇到的问题和解决的方法,奈何没有找到一方乐土,最近经常反思,是否需要记录平时的点滴,后台还是决定下定决心记录一些,以便以后用到的时候找不着,实现这样的一个功能主要也是业务所需要的。

需求:要求统计所有会员在线人数,并根据会员在线状态同步改变人数。

之前用户登录使用session去控制,那么可以通过session进行在线用户人数统计,后来实现无状态不在依赖session作为用户在线的标准,使用Redis替换了Session,那么用户直接退出也好做,但是会存在用户直接关闭页面的情况,那么这个时候用户的缓存凭证没有主动触发去主动删掉,所以思来想去查了一些资料通过缓存的Key监听事件来处理,但是网上的大都是单机版的,对于集群环境下的就少之又少,由于集群是有多个节点,并且key采用的是分片的方式存储在不同片区,然而使用Spring的RedisTemplate的又不支持集群环境下的监听事件,由于每次与Redis服务系保持一个有效连接就可以,那么就有可能某个key所在的片区并没有被监听到事件,因此需要在源码上做一些调整,认为让它遍历所有集群节点用来监听集群中的key。所以通过翻阅资料实现下面的功能,还算圆满的完成了需求任务,当然如果看官看到某些似曾相识的地方请谅解,我也是从大家的经验中寻找方法有些地方与大家的相似也属正常。

第一步:修改Redis的配置文件,这一步可让《运维》同事协助操作,在配置文件中添加如下内容:

Redis的配置文件:

############################# EVENT NOTIFICATION ##############################

# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
# K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
#
notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.

此时我们需要开启缓存的键空间通知事件的配置:notify-keyspace-events Ex

第二步配置Redis信息,我采用的是yml格式文件,同时配置了三套模式分别为单机模式、哨兵模式、集群模式,各位看官在配置文件中可自行开启或关闭。

#缓存配置
  redis:
    database: 0
    #host: 127.0.0.1
    #port: 6379
    #sentinel:
      #master: mymaster
      #nodes: 192.168.0.223:27001
    #timeout: 6000ms
    password: Aa123456    
    cluster:
      max-redirects: 3   #获取失败 最大重定向次数
      nodes:
        - 192.168.104.7:6379
        - 192.168.104.7:6380
        - 192.168.104.8:6379
        - 192.168.104.8:6380
        - 192.168.104.9:6379
        - 192.168.104.9:6380
    lettuce:
      pool:
        max-active: 1000  #连接池最大连接数(使用负值表示没有限制)
        max-idle: 10      #连接池中的最大空闲连接
        min-idle: 5       #连接池中的最小空闲连接
        max-wait: 3000      #连接池最大阻塞等待时间(使用负值表示没有限制)

第三步重写缓存的默认配置函数了,并绑定监听的主题,从程序中我们可以看到"__keyevent@0__:expired" 意思就是订阅Redis的第一个数据库的键值失效事件,这里需要多说一下,Redis有16个数据库,系统默认使用第一个苦也就是0,如果你在配置的时候不想使用系统默认数据库,你可以通过配置文件指定库,那么你这里就需要根据你指定的库做键值事件。 

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.Jedis;

@Configuration
@ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
public class RedisAutoConfiguration {

    @Configuration
    @ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()")
    public static class RedisStandAloneAutoConfiguration {
        @Bean
        public RedisMessageListenerContainer customizeRedisListenerContainer(
                RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired"));
            return redisMessageListenerContainer;
        }
    }


    @Configuration
    @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
    public static class RedisClusterAutoConfiguration {
        @Bean
        public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
            beans.setBeanFactory(beanFactory);
            beans.setRedisConnectionFactory(redisConnectionFactory);
            return beans;
        }
    }
}

第四步实现《org.springframework.context.ApplicationListener》的onApplicationEvent方法,主要的目的就是监听集群中的所有节点,并且给《org.springframework.data.redis.listener.RedisMessageListenerContainer》创建一个键空间的主题事件。

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.JedisShardInfo;

public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {

    @Value("${spring.redis.password}")
    private String password;
    
    private DefaultListableBeanFactory beanFactory;

    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private MessageListener messageListener;

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
        if (redisClusterConnection != null) {
            Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
            for (RedisClusterNode node : nodes) {
                if (node.isMaster()) {
                    String containerBeanName = "messageContainer" + node.hashCode();
                    if (beanFactory.containsBean(containerBeanName)) {
                        return;
                    }
                    JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
                    jedisShardInfo.setPassword(password);
                    JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
                    BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                            .genericBeanDefinition(RedisMessageListenerContainer.class);
                    containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                    containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                    containerBeanDefinitionBuilder.setLazyInit(false);
                    beanFactory.registerBeanDefinition(containerBeanName,
                            containerBeanDefinitionBuilder.getRawBeanDefinition());

                    RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
                            RedisMessageListenerContainer.class);
                    String listenerBeanName = "messageListener" + node.hashCode();
                    if (beanFactory.containsBean(listenerBeanName)) {
                        return;
                    }
                    container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
                    container.start();
                }
            }
        }
    }

}

第五步实现监听事件触发后的业务代码

import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import com.cn.tianxia.api.common.v2.CacheKeyConstants;
import com.cn.tianxia.api.project.v2.OnlineUserEntity;
import com.cn.tianxia.api.service.v2.OnlineUserService;
import com.cn.tianxia.api.utils.SpringContextUtils;

/**
 * @ClassName KeyExpiredEventMessageListener
 * @Description redis失效事件
 * @author Hardy
 * @Date 2019年5月20日 下午2:53:33
 * @version 1.0.0
 */
@Component
public class KeyExpiredEventMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expired = message.toString();
        String onlineKey = CacheKeyConstants.ONLINE_USER_KEY_UID;
        if (expired.contains(onlineKey)) {
            String uid = expired.replace(CacheKeyConstants.ONLINE_USER_KEY_UID, "");
            if (StringUtils.isNoneEmpty(uid)) {
                OnlineUserService onlineUserService = (OnlineUserService) SpringContextUtils
                        .getBeanByClass(OnlineUserService.class);
                OnlineUserEntity onlineUser = onlineUserService.getByUid(uid);
                if (onlineUser != null) {
                    onlineUser.setLogoutTime(System.currentTimeMillis());
                    onlineUser.setOffStatus((byte) 0);
                    onlineUser.setIsOff((byte) 1);
                    onlineUser.setUid(Long.parseLong(uid));
                    onlineUserService.insertOrUpdateOnlineUser(onlineUser);
                }
            }
        }
    }

}

整个过程实现这五步就完成了Redis的键值空间事件了,其实Redis本身提供订阅与发布的功能,追其根本就是通过订阅Redis服务器的发布的一个主题进行消费。

原文地址:https://www.cnblogs.com/xfearless/p/11393438.html