redis 实现延迟队列

时间:2021-10-11
本文章向大家介绍redis 实现延迟队列,主要包括redis 实现延迟队列使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1、延迟队列工厂

package cn.xs.qishi.micro.plan.common.queue;

import cn.xs.ambi.bas.util.StringUtils;
import cn.xs.ambi.mgt.redis.RedisManager;
import lombok.CustomLog;
import org.springframework.util.CollectionUtils;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * redis 实现延时对列 planReminder
 *
 * @author liuxn
 * @date 2021/10/9
 */
@CustomLog
public abstract class RedisDelayQueueFactory {


    public abstract void initDelayQueue();

    /**
     * 最终执行的任务方法
     *
     * @param message 任务消息
     */
    public abstract void invoke(String message);


    /**
     * 设置队列名字
     */
    public abstract String setQueueName();

    /**
     * 发送消息
     *
     * @param content   内容
     * @param delayTime 延迟时间
     */
    public void send(String content, long delayTime) {
        RedisManager.getRedisTemplate().opsForZSet().add(setQueueName(), content, (double) delayTime);
        log.info(">> redis 延迟队列:[" + setQueueName() + "],消息内容:[" + content + "],delayTime:[" + delayTime + "]");
    }

    /**
     * 队列消费者
     */
    public void startDelayQueueMachine(Executor asyncExecutor) {
        while (true) {
            try {
                long min = 0;
                long max = System.currentTimeMillis() ;
                Set<String> messages = RedisManager.getRedisTemplate().opsForZSet().rangeByScore(setQueueName(), min, max);
                assert messages != null;
                log.debug(">> 监控队列:[" + setQueueName() + "],消息数:[" + messages.size() + "]");
                // 如果不为空则遍历判断其是否满足取消要求
                if (!CollectionUtils.isEmpty(messages)) {
                    for (String message : messages) {
                        if (StringUtils.isBlank(message)) {
                            continue;
                        }
                        long num = RedisManager.getRedisTemplate().opsForZSet().remove(setQueueName(), message);
                        //如果移除成功, 消费
                        if (num > 0) {
                            asyncExecutor.execute(() -> invoke(message));
                        }
                    }
                }
            } catch (Exception e) {
                log.error(">> 延迟队列监听异常", e);
            } finally {
                // 间隔30秒钟搞一次
                try {
                    TimeUnit.SECONDS.sleep(30L);
                } catch (InterruptedException e) {
                    log.error(">> 延迟队列监听异常", e);
                }
            }
        }
    }

}

2、计划队列实现

package cn.xs.qishi.micro.plan.common.queue;

import cn.xs.qishi.entity.pojo.CcPlanSendLog;
import cn.xs.qishi.micro.plan.common.constant.Constant;
import cn.xs.qishi.micro.plan.common.constant.RedisConstant;
import cn.xs.qishi.micro.plan.common.vo.PlanReminderVo;
import cn.xs.qishi.micro.plan.service.PlanService;
import com.alibaba.fastjson.JSON;
import lombok.CustomLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 计划任务提醒
 *
 * @author liuxn
 * @date 2021/10/11
 */
@Component
@CustomLog
public class PlanReminderQueue extends RedisDelayQueueFactory {


    @Autowired
    private PlanService planService;

    @Override
    public void initDelayQueue() {
        List<CcPlanSendLog> list = planService.getPlanSendLog(Constant.PLAN_SEND_LOG_STATUS_0);
        for (CcPlanSendLog sendLog : list) {
            PlanReminderVo reminderVo = new PlanReminderVo();
            reminderVo.setPlanId(sendLog.getPid());
            reminderVo.setPlanLogId(sendLog.getId());
            send(reminderVo.toString(),sendLog.getSendTime().getTime());
        }
        log.info(">> 项目启动,提醒任务初始化..条数:"+list.size());
    }

    /**
     * 最终执行的任务方法
     *
     * @param message 任务消息
     */
    @Override
    public void invoke(String message) {
        log.info(">> 计划任务提醒队列接收到延迟消息:" + message);
        PlanReminderVo vo = JSON.parseObject(message, PlanReminderVo.class);
        planService.reminder(vo);
    }

    /**
     * 设置队列名字
     */
    @Override
    public String setQueueName() {
        return RedisConstant.QUEUE_PLAN_REMIND;
    }
}

  3、项目启动时初始化。以及开启监听

package cn.xs.qishi.micro.plan.common.runner;

import cn.xs.qishi.micro.plan.common.queue.PlanReminderQueue;
import lombok.CustomLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;

/**
 * 计划定时提醒初始类
 *
 * @author liuxn
 * @date 2021/10/9
 */
@Component
@CustomLog
public class ScheduleReminderRunner implements ApplicationRunner {

    @Autowired
    private Executor asyncExecutor;
    @Autowired
    private PlanReminderQueue planReminderQueue;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info(">> 项目启动完毕,初始化计划任务闹钟提醒.启动队列监听 ");
        asyncExecutor.execute(() -> planReminderQueue.startDelayQueueMachine(asyncExecutor));
        //队列初始化
        planReminderQueue.initDelayQueue();
        log.info(">> 项目启动完毕,初始化计划任务闹钟提醒完毕 !!! ");

    }
}

4、注意

private Executor asyncExecutor; 是线程池
planReminderQueue.send("消息内容","延迟时间"); 代码调用
initDelayQueue 方法根据业务需求进行实现

  

  

原文地址:https://www.cnblogs.com/lxn0216/p/15394193.html