JDK容器学习之Queue:DelayQueue

时间:2022-04-27
本文章向大家介绍JDK容器学习之Queue:DelayQueue,主要内容包括延迟阻塞队列 DelayQueue、I. 阻塞队列的实现逻辑、2. 入队、3. 出队、II. 应用场景及使用case、2. 实现、III. 小结、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

延迟阻塞队列 DelayQueue

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞

延迟阻塞队列DelayQueue的底层是基于优先级队列PriorityQueue来实现的,因此研究延迟阻塞队列,更多的注意力应集中在以下两点

  • 阻塞是如何实现的
  • 应用场景是什么

I. 阻塞队列的实现逻辑

1. 限定

类的声明如下,要求队列中的元素必须继承 Delayed

public class DelayQueue<E extends Delayed> 
    extends AbstractQueue<E>
    implements BlockingQueue<E>
    
    
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

这个限定,主要服务于优先级队列的排序要求,根据延迟时间对元素队列中的元素进行排序

2. 入队

入队的实现逻辑比较简单,为了保证并发安全,实现中实现加锁机制

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

入队的实际是交由优先级队列进行实现,需要注意的是,入队之后,额外的一个操作,如果入队的元素恰好在队列头,执行两个操作

  1. leader赋值为空 (这个是干嘛的,为什么这么做?)
  2. available.signal() 唤醒被阻塞的线程(什么线程被阻塞?)

3. 出队

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

出队的操作同样加锁,获取队列头的元素,判断延期时间是否结束,是才返回结果,否则返回null

注意,这里有两个疑问

  1. 队列中元素getDelay()方法返回值会变么,由谁来改变呢?
  2. 上面的出队没有阻塞,直接返回了null

虽然上面的出队和入队的逻辑比较简单,但是留下的疑问一点都不少,上面的四个问题应该如何解答?

继续看源码,发现还有一个出队的方法, 传入了两个参数表示阻塞的超时时间(即超过这个时间没有返回,则抛一个中端异常)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队列头
            E first = q.peek();
            if (first == null) { // 队列为空
                if (nanos <= 0)
                    // 延时时间已过,直接返回null
                    return null;
                else
                    // 当前线程阻塞 nanos (ns),然后再次循环
                    nanos = available.awaitNanos(nanos);
            } else { // 队列非空
                // 获取队列头元素的延迟时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) // 延迟时间小于0,直接返回队列头
                    return q.poll();
                if (nanos <= 0) 
                // 阻塞时间已过,队列头的延迟时间还没到,则返回null
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                // 无法获取当前的队列头
                //(因为队列头延迟时间大于阻塞时间,即队列头不生效)
                // 继续阻塞,以期望此时可能新增一个到队列头
                    nanos = available.awaitNanos(nanos);
                else {
                // 可以获取当前队列头
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 阻塞到队列头生效
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

分析: 以当前队列为空作为条件

上面代码的流程如下:

  1. 阻塞当前方法
  2. 此时若新入队一个元素,根据前面入队方法,此时表示新入队的就是在队列头,会发出一个唤醒的操作
  3. 此时阻塞的线程被唤醒,继续循环,再次获取队列头(此时非空了)
  4. 判断队列头的元素延迟时间是否已过
  • 已过,则弹出队列头,并返回
  • 未过,继续判断阻塞时间是否小于0
    • 是则表示已经过了预期的阻塞时间,直接返回null
    • 若阻塞时间小于队列头的延迟时间(表示可以当前的队列头,不是本方法预期的),则继续阻塞当前线程,以期望此时有新入队的元素可能被再次获取
    • 否则表示当前线程获可以获取现在的队列头,记录下当前线程,并阻塞,等到队列头元素生效

继续化重点

  • 添加元素到队列头会唤起出队的阻塞线程
  • 被唤起之后,出队线程会再次获取队列头元素,判断是否可以返回(即getDelay返回小于0)

上面的方法因为加上了一个超时时间(即在指定的时间内依然无法返回时,断掉阻塞),分析起来可能不太顺畅,再看源码,还有一个take方法,逻辑与上面相似,只是砍掉了超时断开阻塞的逻辑

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
            // 队列头为空,则阻塞,直到新增一个入队为止
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                // 若队列头元素已生效,则直接返回
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                // leader 非空时,表示有其他的一个线程在出队阻塞中
                // 此时挂住当前线程,等待另一个线程出队完成
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 等待队列头元素生效
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
        // 当前线程出队完成,通知其他出队阻塞的线程继续执行
            available.signal();
        lock.unlock();
    }
}

通过了之前的烧脑逻辑之后,再看这个就简单很多了

1. 队列为空,则阻塞,直到有个线程完成入队操作

2. 获取队列头,若队列头已生效,则直接返回

3. 若队列头未生效

  1. 若有另一个线程已经处于等待队列头生效的阻塞过程中,则阻塞当前线程,直到另一个线程完成出队操作
  2. 若没有其他线程阻塞在出队过程中,即当前线程为第一个获取队列头的线程
    • 标识当前线程处于等待队列头生效的阻塞中(leader = thisThread
    • 阻塞当前线程,等待队列头生效
    • 队列头生效之后,清空标识(leader=null)
    • 再次进入循环,获取队列头并返回
  3. 最后步骤1中被阻塞的线程

因此可以愉快的解答上面的四个问题

添加一个元素到队列头

  1. leader赋值为空 (这个是干嘛的,为什么这么做?) leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头,表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义,此时需要获取新的队列头进行返回了
  2. available.signal() 唤醒被阻塞的线程(什么线程被阻塞?) 获取队列头的线程被唤起,主要有两种场景: 1. 之前队列为空,导致被阻塞的线程 2. 之前队列非空,但是队列头没有生效导致被阻塞的线程

普通的出队方法

  1. 队列中元素getDelay()方法返回值会变么,由谁来改变呢? 必须得变,否则这个元素一直不生效,将直接导致线程一直阻塞 由队列中的元素实现类来保证,返回值是逐渐变小的
  2. 上面的出队没有阻塞,直接返回了null 需要阻塞获取队列头,用 `take`, `poll(long,TimeUnit)`来代替

II. 应用场景及使用case

上面分析的是阻塞队列的实现原理,接下来举一个实例来解析下这个延迟阻塞队列的使用姿势,加深下理解

1. 一个实例场景

(简化了在简化之后的,与实际会有一些区别,请勿完全认定合理)

比如和电商的详情页展示,为了提高应用的性能,我们将整个页面进行了缓存,当详情页发生修改后,我们会更新缓存的内容

因此为了保证缓存的内容和实际的内容是一致的,我们需要一个对账的任务,当详情页修改后,并且更新缓存完成之后,我们需要再次对比缓存和实际内容的一致性;

此时一个异步的任务可以这么设计:监听详情页修改的事件,延迟一段时间,然后再对比缓存和实际内容的一致性 (这里延迟一段时间主要是为了保证缓存已经更新完成)

2. 实现

详情信息 DetailInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DetailInfo {

    private int itemId;

    private String title;

    private String desc;

    private int price;

}

延迟队列的元素 UpdateTask

注意其中 getDelay() 的实现逻辑,根据当前时间与预订的延迟生效时间进行比较

@Data
@AllArgsConstructor
public class UpdateTask implements Delayed {

    private int itemId;

    private long delayTime;


    @Override
    public long getDelay(TimeUnit unit) {
        return delayTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS));
    }
}

主业务逻辑

更新事件的监听订阅使用了 Guava的EventBus来处理,如有疑问可以搜索EventBus的使用姿势

public class DetailManager {
    // 模拟真实数据存储空间
    private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>();
    // 模拟缓存空间
    private Map<String, String> cache = new ConcurrentHashMap<>();

    private Gson gson = new Gson();

    private String getCacheKey(int itemId) {
        return "detailInfo_" + itemId;
    }

    // eventBus 用于发送更新事件;异步接受更新事件
    private AsyncEventBus eventBus;
    private void init() {
        DetailInfo detailInfo = new DetailInfo(1, "onw", "第一个测试", 100);
        DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二个测试", 200);

        realMap.put(detailInfo.getItemId(), detailInfo);
        realMap.put(detailInfo2.getItemId(), detailInfo2);

        cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo));
        cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2));

        eventBus = new AsyncEventBus("Validate-Thread", 
            Executors.newFixedThreadPool(2));
        eventBus.register(this);
    }

    // 模拟更新商品
    public void updateDetail(int itemId) {
        DetailInfo detailInfo = realMap.get(itemId);
        long now = System.currentTimeMillis();
        detailInfo.setTitle("title_" + itemId + "_" + now);
        cache.put(getCacheKey(itemId), gson.toJson(detailInfo));

        // 发送一个修改的事件
        eventBus.post(new UpdateTask(itemId, now + 5000));
        System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000));
    }

    // 延迟队列
    private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>();
    /**
     * 监听修改事件
     * @param updateTask
     */
    @Subscribe
    public void verify(UpdateTask updateTask) {
        long getTaskTime = System.currentTimeMillis();
        delayQueue.put(updateTask);

        try {
            UpdateTask task = delayQueue.take();
            long processTime = System.currentTimeMillis();


            DetailInfo real = realMap.get(task.getItemId());
            String cacheObj = cache.get(getCacheKey(task.getItemId()));
            boolean ans = gson.toJson(real).equals(cacheObj);
            System.out.println("validate itemId: " + updateTask.getItemId() +
                    " getEventTime: " + getTaskTime +
                    " processTime:" + processTime + " ans: " + ans);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DetailManager detailManager = new DetailManager();
        detailManager.init();

        // 开始修改
        detailManager.updateDetail(1);
        Thread.sleep(20);
        detailManager.updateDetail(2);

        Thread.sleep(35000);
    }
}

简单说明主流程

  1. 首先初始化DetailManager
  2. 更新两个商品,在更新的逻辑中实现以下步骤
  • 更新实际的商品内容
  • 更新缓存的内容
  • 发送一条商品更新的消息
  1. 异步监听更新消息任务逻辑
  • 将消息塞入延迟队列
  • 从延迟队列中后去已经生效的消息,然后对账

输出结果如下

[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067
[UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103
Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true
Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true

从上面的输出可以得知,实际验证的时间戳和预期的时间错是相同的

III. 小结

延迟阻塞队列DelayQueue,学习下来之后感觉非常有意思,首先是加深了使用姿势的了解,其次对其中的阻塞,唤醒机制有了一定了解,涨了锁使用知识的见识(这里面还有一个非常有意思的东西就是 ConditionReentrantLock的使用,后续线程安全篇的研究可以以此作为应用场景)

简单小结上面的学习内容

  1. 队列中更不能有null
  2. 底层使用的是优先级队列 PriorityQueue
  3. 通过锁来实现线程安全
  4. 需要使用阻塞的获取元素时,请使用 take(), poll(long, TimeUnit)两方法之一
  5. 要求延迟阻塞队列的元素实现 Delayed接口,内部实现的getDelay方法,要求返回值越来越小(如果一直大于0,这个延迟任务就一直无法执行了)