高性能队列Disruptor框架的详细说明与实战使用

时间:2020-04-19
本文章向大家介绍高性能队列Disruptor框架的详细说明与实战使用,主要包括高性能队列Disruptor框架的详细说明与实战使用使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Disruptor的使用

1.简介

The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure.

(LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发性、性能和非阻塞算法的研究,如今已成为其Exchange基础架构的核心部分。)

-- 引用自GITHUB介绍

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。以下是介绍wiki地址:

https://github.com/LMAX-Exchange/disruptor/wiki

2.Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

3.Disruptor实现特征

另一个关键的实现低延迟的细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。
只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。

2.Disruptor实现生产者消费者模型

这里我们按照原作者Demo介绍制作一个放入LongValue的生产者和消费者模型,相关的代码如下所示:

maven依赖
	<dependencies>
		<dependency>
			<groupId>com.lmax</groupId>
			<artifactId>disruptor</artifactId>
			<version>3.2.1</version>
		</dependency>
	</dependencies>
LongEvent
//定义事件event  通过Disruptor 进行交换的数据类型。
public class LongEvent  {

	private Long value;

	public Long getValue() {
		return value;
	}

	public void setValue(Long value) {
		this.value = value;
	}

}
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {

	public LongEvent newInstance() {

		return new LongEvent();
	}

}
LongEventHandler
// 消费者获得数据
public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("消费者获得数据:" + longEvent.getValue());
    }
}
LongEventProducer
// 生产者
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 获取事件队列的下表位置
        long sequence = ringBuffer.next();
        try {
            // 取出空队列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 给空队列赋值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生产者发送数据....");
            ringBuffer.publish(sequence);
        }

    }

}
MainTest
public class MainTest {

    public static void main(String[] args) {

        // 1. 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // 2. 创建工厂
        LongEventFactory longEventFactory = new LongEventFactory();

        // 3.创建ringbuffer 大小
        int ringbuffer = 1024 * 1024; // 2的N次方

        // 4. 创建disruptor
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
                longEventFactory, ringbuffer, executor,
                ProducerType.MULTI, new YieldingWaitStrategy()
        );

        // 5. 连接消费者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 6. 启动
        longEventDisruptor.start();

        // 7.创建ringbuffer容器
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();

        // 8.创建生产者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);

        // 9. 指定缓冲区的大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 10; i++) {
            byteBuffer.putLong(0,i);
            longEventProducer.onData(byteBuffer);
        }
        executor.shutdown();
        longEventDisruptor.shutdown();

    }
}

执行结果如下:
生产者发送数据....
生产者发送数据....
生产者发送数据....
生产者发送数据....
消费者获得数据:0
生产者发送数据....
消费者获得数据:1
生产者发送数据....
消费者获得数据:2
生产者发送数据....
消费者获得数据:3
生产者发送数据....
消费者获得数据:4
生产者发送数据....
消费者获得数据:5
生产者发送数据....
消费者获得数据:6
消费者获得数据:7
消费者获得数据:8
消费者获得数据:9

原文地址:https://www.cnblogs.com/charlypage/p/12731222.html