DelayQueue
时间:2019-11-27
本文章向大家介绍DelayQueue,主要包括DelayQueue使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
延迟阻塞队列 DelayQueue
延迟阻塞队列 DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
使用场景
因延迟阻塞队列的特性, 我们一般将 DelayQueue 作用于以下场景 :
- 缓存系统 : 当能够从 DelayQueue 中获取元素时,说该缓存已过期
- 定时任务调度 :
下面我们以缓存系统的应用,看下 DelayQueue 的使用,代码如下:
public class DelayQueueDemo {
static class Cache implements Runnable {
private boolean stop = false;
private Map<String, String> itemMap = new HashMap<>();
private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();
public Cache () {
// 开启内部线程检测是否过期
new Thread(this).start();
}
/**
* 添加缓存
*
* @param key
* @param value
* @param exprieTime 过期时间,单位秒
*/
public void put (String key, String value, long exprieTime) {
CacheItem cacheItem = new CacheItem(key, exprieTime);
// 此处忽略添加重复 key 的处理
delayQueue.add(cacheItem);
itemMap.put(key, value);
}
public String get (String key) {
return itemMap.get(key);
}
public void shutdown () {
stop = true;
}
@Override
public void run() {
while (!stop) {
CacheItem cacheItem = delayQueue.poll();
if (cacheItem != null) {
// 元素过期, 从缓存中移除
itemMap.remove(cacheItem.getKey());
System.out.println("key : " + cacheItem.getKey() + " 过期并移除");
}
}
System.out.println("cache stop");
}
}
static class CacheItem implements Delayed {
private String key;
/**
* 过期时间(单位秒)
*/
private long exprieTime;
private long currentTime;
public CacheItem(String key, long exprieTime) {
this.key = key;
this.exprieTime = exprieTime;
this.currentTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
// 计算剩余的过期时间
// 大于 0 说明未过期
return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
}
@Override
public int compareTo(Delayed o) {
// 过期时间长的放置在队列尾部
if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
return 1;
}
// 过期时间短的放置在队列头
if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
return -1;
}
return 0;
}
public String getKey() {
return key;
}
}
public static void main(String[] args) throws InterruptedException {
Cache cache = new Cache();
// 添加缓存元素
cache.put("a", "1", 5);
cache.put("b", "2", 4);
cache.put("c", "3", 3);
while (true) {
String a = cache.get("a");
String b = cache.get("b");
String c = cache.get("c");
System.out.println("a : " + a + ", b : " + b + ", c : " + c);
// 元素均过期后退出循环
if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
break;
}
TimeUnit.MILLISECONDS.sleep(1000);
}
cache.shutdown();
}
}
复制代码
执行结果如下:
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
key : c 过期并移除
a : 1, b : 2, c : null
key : b 过期并移除
a : 1, b : null, c : null
key : a 过期并移除
a : null, b : null, c : null
cache stop
复制代码
从执行结果可以看出,因循环内部每次停顿 1 秒,当等待 3 秒后,元素 c 过期并从缓存中清除,等待 4 秒后,元素 b 过期并从缓存中清除,等待 5 秒后,元素 a 过期并从缓存中清除。
实现原理
变量
重入锁
private final transient ReentrantLock lock = new ReentrantLock();
复制代码
用于保证队列操作的线程安全性
优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
复制代码
存储介质,用于保证延迟低的优先执行
leader
leader 指向的是第一个从队列获取元素阻塞等待的线程,其作用是减少其他线程不必要的等待时间。(这个地方我一直没搞明白 怎么就减少其他线程的等待时间了)
condition
private final Condition available = lock.newCondition();
复制代码
条件对象,当新元素到达,或新线程可能需要成为leader时被通知
下面将主要对队列的入队,出队动作进行分析 :
入队 - offer
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
if (q.peek() == e) {
// 若入队的元素位于队列头部,说明当前元素延迟最小
// 将 leader 置空
leader = null;
// 唤醒阻塞在等待队列的线程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
复制代码
出队 - poll
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// 等待 add 唤醒
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 已过期则直接返回队列头节点
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
// 若 leader 不为空
// 说明已经有其他线程调用过 take 操作
// 当前调用线程 follower 挂起等待
available.await();
else {
// 若 leader 为空
// 将 leader 指向当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当前调用线程在指定 delay 时间内挂起等待
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// leader 处理完之后,唤醒 follower
available.signal();
lock.unlock();
}
}
复制代码
Leader-follower 模式
该图引用自 CSDN 《Leader/Follower多线程网络模型介绍》
摘自--https://juejin.im/post/5bf945b95188254e2a04329b
小结
看了 DelayQueue 的实现 我们大概也明白 PriorityQueue 采用小顶堆的原因了。
原文地址:https://www.cnblogs.com/zhangfengshi/p/11943743.html
- 打造你的专属AI游戏机器人:太空侵略者
- WCF后续之旅(16): 消息是如何分发到Endpoint的--消息筛选(Message Filter)
- 最高大上的展览!腾讯建了一个小“方盒子”,里面全是高科技
- Blend基础-布局控件
- Spring实战——缓存
- Nodejs学习笔记(十二)--- 定时任务(node-schedule)
- Spring实战——缓存
- Spring实战——缓存
- 今天,微信紧急发布小程序最强入口!这3项新能力堪称年度豪礼!
- Python基础知识梳理-第01部分
- 卷积神经网络之卷积操作
- Silverlight体积优化
- 江湖秘笈:说烦了破解、渗透等,不如大家一起聊聊硬盘加密?
- Nodejs学习笔记(十三)— PM2
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP JWT初识及其简单示例
- PHP保存Base64图片base64_decode的问题整理
- php实现小程序支付完整版
- Laravel5.1 框架路由基础详解
- Laravel框架实现抢红包功能示例
- Laravel5.1 框架模型创建与使用方法实例分析
- php实现单笔转账到支付宝功能
- PHP发送邮件确认验证注册功能示例【修改别人邮件类】
- PHP实现微信退款功能
- 从零开始玩转PerfDogService---------初探篇
- php+laravel依赖注入知识点总结
- PHP获取当前系统时间的办法小结
- Laravel 中使用简单的方法跟踪用户是否在线(推荐)
- 浅析php怎么实现爬取数据原理
- 在 Laravel 中动态隐藏 API 字段的方法