redis 实现延迟队列
时间:2021-10-11
本文章向大家介绍redis 实现延迟队列,主要包括redis 实现延迟队列使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1、延迟队列工厂
package cn.xs.qishi.micro.plan.common.queue; import cn.xs.ambi.bas.util.StringUtils; import cn.xs.ambi.mgt.redis.RedisManager; import lombok.CustomLog; import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** * redis 实现延时对列 planReminder * * @author liuxn * @date 2021/10/9 */ @CustomLog public abstract class RedisDelayQueueFactory { public abstract void initDelayQueue(); /** * 最终执行的任务方法 * * @param message 任务消息 */ public abstract void invoke(String message); /** * 设置队列名字 */ public abstract String setQueueName(); /** * 发送消息 * * @param content 内容 * @param delayTime 延迟时间 */ public void send(String content, long delayTime) { RedisManager.getRedisTemplate().opsForZSet().add(setQueueName(), content, (double) delayTime); log.info(">> redis 延迟队列:[" + setQueueName() + "],消息内容:[" + content + "],delayTime:[" + delayTime + "]"); } /** * 队列消费者 */ public void startDelayQueueMachine(Executor asyncExecutor) { while (true) { try { long min = 0; long max = System.currentTimeMillis() ; Set<String> messages = RedisManager.getRedisTemplate().opsForZSet().rangeByScore(setQueueName(), min, max); assert messages != null; log.debug(">> 监控队列:[" + setQueueName() + "],消息数:[" + messages.size() + "]"); // 如果不为空则遍历判断其是否满足取消要求 if (!CollectionUtils.isEmpty(messages)) { for (String message : messages) { if (StringUtils.isBlank(message)) { continue; } long num = RedisManager.getRedisTemplate().opsForZSet().remove(setQueueName(), message); //如果移除成功, 消费 if (num > 0) { asyncExecutor.execute(() -> invoke(message)); } } } } catch (Exception e) { log.error(">> 延迟队列监听异常", e); } finally { // 间隔30秒钟搞一次 try { TimeUnit.SECONDS.sleep(30L); } catch (InterruptedException e) { log.error(">> 延迟队列监听异常", e); } } } } }
2、计划队列实现
package cn.xs.qishi.micro.plan.common.queue; import cn.xs.qishi.entity.pojo.CcPlanSendLog; import cn.xs.qishi.micro.plan.common.constant.Constant; import cn.xs.qishi.micro.plan.common.constant.RedisConstant; import cn.xs.qishi.micro.plan.common.vo.PlanReminderVo; import cn.xs.qishi.micro.plan.service.PlanService; import com.alibaba.fastjson.JSON; import lombok.CustomLog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * 计划任务提醒 * * @author liuxn * @date 2021/10/11 */ @Component @CustomLog public class PlanReminderQueue extends RedisDelayQueueFactory { @Autowired private PlanService planService; @Override public void initDelayQueue() { List<CcPlanSendLog> list = planService.getPlanSendLog(Constant.PLAN_SEND_LOG_STATUS_0); for (CcPlanSendLog sendLog : list) { PlanReminderVo reminderVo = new PlanReminderVo(); reminderVo.setPlanId(sendLog.getPid()); reminderVo.setPlanLogId(sendLog.getId()); send(reminderVo.toString(),sendLog.getSendTime().getTime()); } log.info(">> 项目启动,提醒任务初始化..条数:"+list.size()); } /** * 最终执行的任务方法 * * @param message 任务消息 */ @Override public void invoke(String message) { log.info(">> 计划任务提醒队列接收到延迟消息:" + message); PlanReminderVo vo = JSON.parseObject(message, PlanReminderVo.class); planService.reminder(vo); } /** * 设置队列名字 */ @Override public String setQueueName() { return RedisConstant.QUEUE_PLAN_REMIND; } }
3、项目启动时初始化。以及开启监听
package cn.xs.qishi.micro.plan.common.runner; import cn.xs.qishi.micro.plan.common.queue.PlanReminderQueue; import lombok.CustomLog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; /** * 计划定时提醒初始类 * * @author liuxn * @date 2021/10/9 */ @Component @CustomLog public class ScheduleReminderRunner implements ApplicationRunner { @Autowired private Executor asyncExecutor; @Autowired private PlanReminderQueue planReminderQueue; @Override public void run(ApplicationArguments args) throws Exception { log.info(">> 项目启动完毕,初始化计划任务闹钟提醒.启动队列监听 "); asyncExecutor.execute(() -> planReminderQueue.startDelayQueueMachine(asyncExecutor)); //队列初始化 planReminderQueue.initDelayQueue(); log.info(">> 项目启动完毕,初始化计划任务闹钟提醒完毕 !!! "); } }
4、注意
private Executor asyncExecutor; 是线程池
planReminderQueue.send("消息内容","延迟时间"); 代码调用
initDelayQueue 方法根据业务需求进行实现
原文地址:https://www.cnblogs.com/lxn0216/p/15394193.html
- 零基础学编程013:import让你飞起来
- 【教程】利用Tensorflow目标检测API确定图像中目标的位置
- 零基础学编程012:画出复利曲线图
- OpenAI发布高度优化的GPU计算内核—块稀疏GPU内核
- SQL SERVER 原来还可以这样玩 FOR XML PATH
- 零基础学编程011:复利数据表问题(总结)
- 一个小程序引发的思考
- 深入内核:DUMP Block的数据读取与脏数据写入影响
- 零基础学编程010:最终可以输出完整的复利数据表了
- 在C#使用文件监控对象FileSystemWatcher 实现数据同步
- 零基础学编程018:条件语句
- 零基础学编程022:函数的世界
- 帝国cms如何调用指定id的文章到首页?
- C 语言 static、extern与指针函数介绍
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 在Linux上如何检查用户所属组详解
- Linux中移除(删除)符号链接的命令
- Linux定时备份数据库到指定邮箱的方法
- 详解nohup /dev/null 2>&1 含义的使用
- centOS7安装jdk1.8的方法
- 你知道一台Linux服务器可以负载多少个连接吗
- Linux环境下安装Nginx及其使用
- CentOS8中的nmcli使用详解
- 在Linux中使用history命令的方法
- Linux服务器部署JavaWeb项目完整教程
- centos 6 安装vsftpd与PAM虚拟用户的方法
- CentOS6.8中/英文环境切换教程图解
- centos7.6 安装Tomcat-8.5.39的方法
- ubuntu14.04安装jdk1.8的教程
- Linux nohup实现后台运行程序及查看(nohup与&)