JUC之striped64
striped64
简介
striped64是java8用来并发计数新加的组件,在并发环境下有更好的性能,64指的是计数64bit的数,即Long类型的数和Double类型的数。striped64是个抽象类,jdk提供了四个实现类,LongAdder LongAccumulator DoubleAdder DoubleAccumulator 我们拿数据说话,分别在并发情况下用synchronized,atomic,striped64进行基准测试。
代码
public class Striped64BenchMark {
private static final Object lock = new Object();
private static long add;
private static AtomicLong add2 = new AtomicLong();
private static LongAdder add3 = new LongAdder();
public static void main(String[] args) throws InterruptedException {
CyclicBarrier begin = new CyclicBarrier(10);
CountDownLatch end = new CountDownLatch(10);
long maxSum = 100000000;
ExecutorService executorService = Executors.newFixedThreadPool(10);
/*synchronized*/
AtomicLong beginTime = new AtomicLong();
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
try {
synchronized (begin) {
if (begin.getNumberWaiting() == 0) {
beginTime.set(System.currentTimeMillis());
}
}
begin.await();
long i = 0;
do {
synchronized (lock) {
add++;
}
}while (++i < maxSum);
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
end.await();
System.out.println("add1 use time is:" + (System.currentTimeMillis() - beginTime.get()));
/*atomic*/
begin.reset();
AtomicLong beginTime2 = new AtomicLong();
CountDownLatch end2 = new CountDownLatch(10);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
try {
synchronized (begin) {
if (begin.getNumberWaiting() == 0) {
beginTime2.set(System.currentTimeMillis());
}
}
begin.await();
long i = 0;
do {
add2.getAndIncrement();
}while (++i < maxSum);
end2.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
end2.await();
System.out.println("add2 use time is:" + (System.currentTimeMillis() - beginTime2.get()));
/*striped64*/
begin.reset();
AtomicLong beginTime3 = new AtomicLong();
CountDownLatch end3 = new CountDownLatch(10);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
try {
synchronized (begin) {
if (begin.getNumberWaiting() == 0) {
beginTime3.set(System.currentTimeMillis());
}
}
begin.await();
long i = 0;
do {
add3.increment();
}while (++i < maxSum);
end3.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
end3.await();
System.out.println("add3 use time is:" + (System.currentTimeMillis() - beginTime3.get()));
}
}
运行结果
add1 use time is:37309
add2 use time is:17996
add3 use time is:4525
设计思想
性能差距还是很明显的,为甚striped64这么快?要从striped64设计思想说起。
striped64采用了类似分段锁的机制,内部数据结构一个long类型的base变量,和持有long类型变量的Cell[]数组。单线程情况下,只需要对base做加减计数,当遇到多线程的时候,计算hash值,找到对应的Cell,用cas来改变Cell中的值,最后的所求的结果就是base+所有Cell中的值,原理相对简单,但代码还是有很多需要关注的点。
接下来撸一撸代码,先来看一看Cell的数据结构。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
结构比较简单,只含有了一个volatile的关键字修饰的long型的值,其他都是cas相关的东西,可以不用太关注。需要注意的是@sun.misc.Contended这个注解,代表会对这个类进行缓存行填充,关于缓存行简单介绍一下。
如图:
在核心1上运行的线程想更新变量X,同时核心2上的线程想要更新变量Y。不幸的是,这两个变量在同一个缓存行中。每个线程都要去竞争缓存行的所有权来更新变量。如果核心1获得了所有权,缓存子系统将会使核心2中对应的缓存行失效。当核心2获得了所有权然后执行更新操作,核心1就要使自己对应的缓存行失效。这会来来回回的经过L3缓存,大大影响了性能。如果互相竞争的核心位于不同的插槽,就要额外横跨插槽连接,问题可能更加严重。
另外除了base,cells两个变量外,还有一个变量用来存储扩容或者初始化的时候是否有别的线程竞争cellsBusy
核心方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 初始化
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
// 最后的桶不为空则 true,也用于控制扩容,false重试。
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
//已经初始化
if ((as = cells) != null && (n = as.length) > 0) {
// 对应桶是空的
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // 尝试关联新的Cell
// 创建cell
Cell r = new Cell(x);
//double check
if (cellsBusy == 0 && casCellsBusy()) {
//创建cell
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null)
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break; // 成功退出
continue; // 已经有别的线程进入了
}
}
// 锁被占用了,扩容或者重hash进入下轮循环
collide = false;
}
// 如何传入的wasUncontended,重hash,进入下轮循环
else if (!wasUncontended)
wasUncontended = true;
// 尝试更新当前桶,成功直接退出
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 表大小达到上限或扩容了;
// 最后重hash,进入下轮循环
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果不存在冲突,则设置为存在冲突
else if (!collide)
collide = true;
// 有竞争,需要扩容
else if (cellsBusy == 0 && casCellsBusy()) {
// 锁空闲且成功获取到锁
try {
if (cells == as) { // 距上一次检查后表没有改变,扩容:加倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; // 释放锁
}
collide = false;
continue; // 在扩容后的表上重试
}
// 没法获取锁,重散列,尝试其他桶
h = advanceProbe(h);
}
//初始celll
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁的情况下初始化表
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (init)
break; // 成功初始化,已更新,跳出循环
}
//有其他线程在初始化cell,尝试更新base的值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))));
break;
}
}
实现类
以LongAdder为例
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//先判断是cells为空,不为空的概率可能大点,而且耗时操作较小,所以先判断,如果为空,表示单线程计数,尝试进行cas操作
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//赋值对应的桶值失败,进入复杂的计数过程
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
获取值方法
//此方法或者的值可能会有误差,加入统计的时候依然进行计数,最后结果就会不精确。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
- lambda+reduce的一句艰深代码
- CRT连接mysql数据库操作
- 基于springMVC拦截器实现操作日志统计
- datepicker小插件(日期时间 & 日期 & 月份)
- 封装好的MAP工具类和HBASE工具类
- JSP+ajax+springMVC+MayBatis处理excel上传导入
- 绚丽的javascript拾色器(不兼容IE8及以下)
- 魔波广告恶意病毒简析
- javascript生成.xls文件(兼容IE&Chrome&Firefox)
- 没用的程序设计题-美甲帮笔试题
- MongoDB Java
- JSON.parse()和JSON.stringify()
- jquery获取主机地址和端口
- 前端验证码绘制(canvas)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- select 高级查询之子查询
- Java 最实用的文件读写
- MySQL基本操作
- MySQL 索引
- notepad++ 配置c++及运行
- Leetcode 687. 最长同值路径
- Leetcode 1071. 字符串的最大公因子
- PAT (Advanced Level) Practice 1146 Topological Order (25分)
- 【风险预警】RPCBind服务被利用进行UDP反射DDoS攻击
- MySQL DDL 操作
- Leetcode 169 多数元素
- Codeforces Round #424 (Div. 1, rated, based on VK Cup Finals) B. Cards Sorting 树状数组+技巧
- MySQL DML 操作
- select 查询基础
- 带大家体验一下删库跑路,不是虚拟机,是真的服务器哦!