高性能队列Disruptor框架的详细说明与实战使用
Disruptor的使用
1.简介
The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure.
(LMAX Disruptor是一个高性能的线程间消息传递库。它源于LMAX对并发性、性能和非阻塞算法的研究,如今已成为其Exchange基础架构的核心部分。)
-- 引用自GITHUB介绍
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。以下是介绍wiki地址:
https://github.com/LMAX-Exchange/disruptor/wiki
2.Disruptor的设计方案
Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。
3.Disruptor实现特征
另一个关键的实现低延迟的细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。
只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。
2.Disruptor实现生产者消费者模型
这里我们按照原作者Demo介绍制作一个放入LongValue的生产者和消费者模型,相关的代码如下所示:
maven依赖
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
LongEvent
//定义事件event 通过Disruptor 进行交换的数据类型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
LongEventHandler
// 消费者获得数据
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("消费者获得数据:" + longEvent.getValue());
}
}
LongEventProducer
// 生产者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 获取事件队列的下表位置
long sequence = ringBuffer.next();
try {
// 取出空队列
LongEvent longEvent = ringBuffer.get(sequence);
// 给空队列赋值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("生产者发送数据....");
ringBuffer.publish(sequence);
}
}
}
MainTest
public class MainTest {
public static void main(String[] args) {
// 1. 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 2. 创建工厂
LongEventFactory longEventFactory = new LongEventFactory();
// 3.创建ringbuffer 大小
int ringbuffer = 1024 * 1024; // 2的N次方
// 4. 创建disruptor
Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
longEventFactory, ringbuffer, executor,
ProducerType.MULTI, new YieldingWaitStrategy()
);
// 5. 连接消费者
longEventDisruptor.handleEventsWith(new LongEventHandler());
// 6. 启动
longEventDisruptor.start();
// 7.创建ringbuffer容器
RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
// 9. 指定缓冲区的大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 0; i < 10; i++) {
byteBuffer.putLong(0,i);
longEventProducer.onData(byteBuffer);
}
executor.shutdown();
longEventDisruptor.shutdown();
}
}
执行结果如下:
生产者发送数据....
生产者发送数据....
生产者发送数据....
生产者发送数据....
消费者获得数据:0
生产者发送数据....
消费者获得数据:1
生产者发送数据....
消费者获得数据:2
生产者发送数据....
消费者获得数据:3
生产者发送数据....
消费者获得数据:4
生产者发送数据....
消费者获得数据:5
生产者发送数据....
消费者获得数据:6
消费者获得数据:7
消费者获得数据:8
消费者获得数据:9
原文地址:https://www.cnblogs.com/charlypage/p/12731222.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 给Linux增加swap内存
- 网鼎杯2018-Fakebook
- 强网杯2019-高明的黑客
- CISCN2019华北赛区Day2-HackWorld
- ZJCTF-NiZhuanSiWei
- xxe漏洞学习
- De1CTF2019-SSRFME
- BJDCTF2nd-EasyMd5
- BJDCTF2nd-fakegoogle
- java_Scanner类、Random类、ArrayList 类的使用
- 使用Python获取Oracle索引信息
- 监控Oracle数据泵状态
- MySQL MHA部署 Part 5 MHA部署指南
- MySQL MHA部署 Part 6 MHA故障转移测试
- 一步步搭建基于GTID的MySQL复制