Java并发包类总览
并发容器
这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区别,
1.add方法和offer方法的区别在于超出容量限制时前者抛出异常,后者返回false;
2.remove方法和poll方法都从队列中拿掉元素并返回,但是他们的区别在于空队列下操作前者抛出异常,而后者返回null;
3.element方法和peek方法都返回队列顶端的元素,但是不把元素从队列中删掉,区别在于前者在空队列的时候抛出异常,后者返回null。
阻塞队列:
- BlockingQueue.class,阻塞队列接口
- BlockingDeque.class,双端阻塞队列接口
- ArrayBlockingQueue.class,阻塞队列,数组实现
- LinkedBlockingDeque.class,阻塞双端队列,链表实现
- LinkedBlockingQueue.class,阻塞队列,链表实现
- DelayQueue.class,阻塞队列,并且元素是Delay的子类,保证元素在达到一定时间后才可以取得到
- PriorityBlockingQueue.class,优先级阻塞队列
- SynchronousQueue.class,同步队列,但是队列长度为0,生产者放入队列的操作会被阻塞,直到消费者过来取,所以这个队列根本不需要空间存放元素;有点像一个独木桥,一次只能一人通过,还不能在桥上停留
非阻塞队列:
- ConcurrentLinkedDeque.class,非阻塞双端队列,链表实现
- ConcurrentLinkedQueue.class,非阻塞队列,链表实现
转移队列:
- TransferQueue.class,转移队列接口,生产者要等消费者消费的队列,生产者尝试把元素直接转移给消费者
- LinkedTransferQueue.class,转移队列的链表实现,它比SynchronousQueue更快
其它容器:
- ConcurrentMap.class,并发Map的接口,定义了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)这四个并发场景下特定的方法
- ConcurrentHashMap.class,并发HashMap
- ConcurrentNavigableMap.class,NavigableMap的实现类,返回最接近的一个元素
- ConcurrentSkipListMap.class,它也是NavigableMap的实现类(要求元素之间可以比较),同时它比ConcurrentHashMap更加scalable——ConcurrentHashMap并不保证它的操作时间,并且你可以自己来调整它的load factor;但是ConcurrentSkipListMap可以保证O(log n)的性能,同时不能自己来调整它的并发参数,只有你确实需要快速的遍历操作,并且可以承受额外的插入开销的时候,才去使用它
- ConcurrentSkipListSet.class,和上面类似,只不过map变成了set
- CopyOnWriteArrayList.class,copy-on-write模式的array list,每当需要插入元素,不在原list上操作,而是会新建立一个list,适合读远远大于写并且写时间并苛刻的场景
- CopyOnWriteArraySet.class,和上面类似,list变成set而已
同步设备
这些类大部分都是帮助做线程之间同步的,简单描述,就像是提供了一个篱笆,线程执行到这个篱笆的时候都得等一等,等到条件满足以后再往后走。
- CountDownLatch.class,一个线程调用await方法以后,会阻塞地等待计数器被调用countDown直到变成0,功能上和下面的CyclicBarrier有点像
- CyclicBarrier.class,也是计数等待,只不过它是利用await方法本身来实现计数器“+1”的操作,一旦计数器上显示的数字达到Barrier可以打破的界限,就会抛出BrokenBarrierException,线程就可以继续往下执行;
- Semaphore.class,功能上很简单,acquire()和release()两个方法,一个尝试获取许可,一个释放许可,Semaphore构造方法提供了传入一个表示该信号量所具备的许可数量。
- Exchanger.class,这个类的实例就像是两列飞驰的火车(线程)之间开了一个神奇的小窗口,通过小窗口(exchange方法)可以让两列火车安全地交换数据。
- Phaser.class,功能上和第1、2个差不多,但是可以重用,且更加灵活,稍微有点复杂(CountDownLatch是不断-1,CyclicBarrier是不断+1,而Phaser定义了两个概念,phase和party),我在下面画了张图,希望能够帮助理解:
任何时候都有一个party的总数,即注册(registered)的party数,它可以在Phaser构造器里指定,也可以任意时刻调用方法动态增减;
每一个party都有unarrived和arrived两种状态,可以通过调用arriveXXX方法使得它从unarrived变成arrived;
每一个线程到达barrier后会等待(调用arriveAndAwaitAdvance方法),一旦所有party都到达(即arrived的party数量等于registered的数量),就会触发advance操作,同时barrier被打破,线程继续向下执行,party重新变为unarrived状态,重新等待所有party的到达;
在绝大多数情况下一个线程就只负责操控一个party的到达,因此很多文章说party指的就是线程,但是这是不准确的,因为一个线程完全可以操控多个party,只要它执行多次的arrive方法。
一个是phase,表示当前在哪一个阶段,每碰到一次barrier就会触发advance操作(触发前调用onAdvance方法),一旦越过这道barrier就会触发phase+1,这很容易理解;
另一个是party,很多文章说它就是线程数,但是其实这并不准确,它更像一个用于判断advance是否被允许发生的计数器:
给出一个Phaser使用的最简单的例子:
public
class
T {
public
static
void
main(String args[]) {
final
int
count = 3;
final
Phaser phaser = new
Phaser(count);
// 总共有3个registered parties
for(int
i = 0; i < count; i++) {
final
Thread thread = new
Thread(new
Task(phaser));
thread.start();
}
}
public
static
class
Task implements
Runnable {
private
final
Phaser phaser;
public
Task(Phaser phaser) {
this.phaser = phaser;
}
@Override
public
void
run() {
phaser.arriveAndAwaitAdvance();
// 每执行到这里,都会有一个party arrive,
//如果arrived parties等于registered parties,
//就往下继续执行,否则等待
}
}
}
原子对象
这些对象都的行为在不使用同步的情况下保证了原子性。值得一提的有两点:
1.weakCompareAndSet方法:compareAndSet方法很明确,但是这个是啥?根据JSR规范,调用weakCompareAndSet时并不能保证happen-before的一致性,因此允许存在重排序指令等等虚拟机优化导致这个操作失败(较弱的原子更新操作),但是从Java源代码看,它的实现其实和compareAndSet是一模一样的;
2.lazySet方法:延时设置变量值,这个等价于set方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能损耗,所以如果不需要立即读取设置的新值,那么此方法就很有用。
- AtomicBoolean.class
- AtomicInteger.class
- AtomicIntegerArray.class
- AtomicIntegerFieldUpdater.class
- AtomicLong.class
- AtomicLongArray.class
- AtomicLongFieldUpdater.class
- AtomicMarkableReference.class,它是用来高效表述Object-boolean这样的对象标志位数据结构的,一个对象引用+一个bit标志位
- AtomicReference.class
- AtomicReferenceArray.class
- AtomicReferenceFieldUpdater.class
- AtomicStampedReference.class,它和前面的AtomicMarkableReference类似,但是它是用来高效表述Object-int这样的“对象+版本号”数据结构,特别用于解决ABA问题
锁
- AbstractOwnableSynchronizer.class,这三个AbstractXXXSynchronizer都是为了创建锁和相关的同步器而提供的基础,锁,还有前面提到的同步设备都借用了它们的实现逻辑
- AbstractQueuedLongSynchronizer.class,AbstractOwnableSynchronizer的子类,所有的同步状态都是用long变量来维护的,而不是int,在需要64位的属性来表示状态的时候会很有用
- AbstractQueuedSynchronizer.class,为实现依赖于先进先出队列的阻塞锁和相关同步器(信号量、事件等等)提供的一个框架,它依靠int值来表示状态
- Lock.class,Lock比synchronized关键字更灵活,而且在吞吐量大的时候效率更高,根据JSR-133的定义,它happens-before的语义和synchronized关键字效果是一模一样的,它唯一的缺点似乎是缺乏了从lock到finally块中unlock这样容易遗漏的固定使用搭配的约束,除了lock和unlock方法以外,还有这样两个值得注意的方法:
- lockInterruptibly:如果当前线程没有被中断,就获取锁;否则抛出InterruptedException,并且清除中断
- tryLock,只在锁空闲的时候才获取这个锁,否则返回false,所以它不会block代码的执行
- ReadWriteLock.class,读写锁,读写分开,读锁是共享锁,写锁是独占锁;对于读-写都要保证严格的实时性和同步性的情况,并且读频率远远大过写,使用读写锁会比普通互斥锁有更好的性能。
- ReentrantLock.class,可重入锁(lock行为可以嵌套,但是需要和unlock行为一一对应),有几点需要注意:
- 构造器支持传入一个表示是否是公平锁的boolean参数,公平锁保证一个阻塞的线程最终能够获得锁,因为是有序的,所以总是可以按照请求的顺序获得锁;不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提高并发的性能
- 还提供了一些监视锁状态的方法,比如isFair、isLocked、hasWaiters、getQueueLength等等
- ReentrantReadWriteLock.class,可重入读写锁
- Condition.class,使用锁的newCondition方法可以返回一个该锁的Condition对象,如果说锁对象是取代和增强了synchronized关键字的功能的话,那么Condition则是对象wait/notify/notifyAll方法的替代。在下面这个例子中,lock生成了两个condition,一个表示不满,一个表示不空:
- 在put方法调用的时候,需要检查数组是不是已经满了,满了的话就得等待,直到“不满”这个condition被唤醒(notFull.await());
- 在take方法调用的时候,需要检查数组是不是已经空了,如果空了就得等待,直到“不空”这个condition被唤醒(notEmpty.await()):
class
BoundedBuffer {
final
Lock lock = new
ReentrantLock();
final
Condition notFull = lock.newCondition();
final
Condition notEmpty = lock.newCondition();
final
Object[] items = new
Object[100];
int
putptr, takeptr, count;
public
void
put(Object x) throws
InterruptedException {
lock.lock();
try
{
while
(count == items.length)
notFull.await();
items[putptr] = x;
if
(++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
// 既然已经放进了元素,肯定不空了,唤醒“notEmpty”
} finally
{
lock.unlock();
}
}
public
Object take() throws
InterruptedException {
lock.lock();
try
{
while
(count == 0)
notEmpty.await();
Object x = items[takeptr];
if
(++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
// 既然已经拿走了元素,肯定不满了,唤醒“notFull”
return
x;
} finally
{
lock.unlock();
}
}
}
Fork-join框架
这是一个JDK7引入的并行框架,它把流程划分成fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,经常需要花费大量的时间去等待其他线程执行任务的完成,但是fork-join框架使用work stealing技术缓解了这个问题:
1.每个工作线程都有一个双端队列,当分给每个任务一个线程去执行的时候,这个任务会放到这个队列的头部;
2.当这个任务执行完毕,需要和另外一个任务的结果执行合并操作,可是那个任务却没有执行的时候,不会干等,而是把另一个任务放到队列的头部去,让它尽快执行;
3.当工作线程的队列为空,它会尝试从其他线程的队列尾部偷一个任务过来;
4.取得的任务可以被进一步分解。
- ForkJoinPool.class,ForkJoin框架的任务池,ExecutorService的实现类
- ForkJoinTask.class,Future的子类,框架任务的抽象
- ForkJoinWorkerThread.class,工作线程
- RecursiveTask.class,ForkJoinTask的实现类,compute方法有返回值,下文中有例子
- RecursiveAction.class,ForkJoinTask的实现类,compute方法无返回值,只需要覆写compute方法,对于可继续分解的子任务,调用coInvoke方法完成(参数是RecursiveAction子类对象的可变数组):
class SortTask extends RecursiveAction { final long[] array; final int lo; final int hi; private int THRESHOLD = 30; public SortTask(long[] array) { this.array = array; this.lo = 0; this.hi = array.length - 1; } public SortTask(long[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } @Override protected void compute() { if (hi - lo < THRESHOLD) sequentiallySort(array, lo, hi); else { int pivot = partition(array, lo, hi); coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array, pivot + 1, hi)); } } private int partition(long[] array, int lo, int hi) { long x = array[hi]; int i = lo - 1; for (int j = lo; j < hi; j++) { if (array[j] <= x) { i++; swap(array, i, j); } } swap(array, i + 1, hi); return i + 1; } private void swap(long[] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } private void sequentiallySort(long[] array, int lo, int hi) { Arrays.sort(array, lo, hi + 1); }} |
---|
测试的调用代码:
@Test
public
void
testSort() throws
Exception {
ForkJoinTask sort = new
SortTask(array);
ForkJoinPool fjpool = new
ForkJoinPool();
fjpool.submit(sort);
fjpool.shutdown();
fjpool.awaitTermination(30, TimeUnit.SECONDS);
assertTrue(checkSorted(array));
}
class
Fibonacci extends
RecursiveTask {
final
int
n;
Fibonacci(int
n) {
this.n = n;
}
private
int
compute(int
small) {
final
int[] results = { 1, 1, 2, 3,
5, 8, 13, 21, 34, 55, 89
};
return
results[small];
}
public
Integer compute() {
if
(n <= 10) {
return
compute(n);
}
Fibonacci f1 = new
Fibonacci(n - 1);
Fibonacci f2 = new
Fibonacci(n - 2);
f1.fork();
f2.fork();
return
f1.join() + f2.join();
}
}
RecursiveTask和RecursiveAction的区别在于它的compute是可以有返回值的,子任务的计算使用fork()方法,结果的获取使用join()方法:
执行器和线程池
这个是我曾经举过的例子:
public class FutureUsage { public static void main(String[] args) { ExecutorService executor = Executors. newSingleThreadExecutor(); Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { Thread.sleep(4000); Object result = "finished"; return result; } }; Future<Object> future = executor.submit(task); System.out.println("task submitted"); try { System.out.println(future.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } // Thread won't be destroyed. }} |
---|
线程池具备这样的优先级处理策略:
1.请求到来首先交给coreSize内的常驻线程执行
2.如果coreSize的线程全忙,任务被放到队列里面
3.如果队列放满了,会新增线程,直到达到maxSize
4.如果还是处理不过来,会把一个异常扔到RejectedExecutionHandler中去,用户可以自己设定这种情况下的最终处理策略
对于大于coreSize而小于maxSize的那些线程,空闲了keepAliveTime后,会被销毁。观察上面说的优先级顺序可以看到,假如说给ExecutorService一个无限长的队列,比如LinkedBlockingQueue,那么maxSize>coreSize就是没有意义的。
ExecutorService:
- Future.class,异步计算的结果对象,get方法会阻塞线程直至真正的结果返回
- Callable.class,用于异步执行的可执行对象,call方法有返回值,它和Runnable接口很像,都提供了在其他线程中执行的方法,二者的区别在于:
- Runnable没有返回值,Callable有
- Callable的call方法声明了异常抛出,而Runnable没有
- RunnableFuture.class,实现自Runnable和Future的子接口,成功执行run方法可以完成它自身这个Future并允许访问其结果,它把任务执行和结果对象放到一起了
- FutureTask.class,RunnableFuture的实现类,可取消的异步计算任务,仅在计算完成时才能获取结果,一旦计算完成,就不能再重新开始或取消计算;它的取消任务方法cancel(boolean mayInterruptIfRunning)接收一个boolean参数表示在取消的过程中是否需要设置中断
- Executor.class,执行提交任务的对象,只有一个execute方法
- Executors.class,辅助类和工厂类,帮助生成下面这些ExecutorService
- ExecutorService.class,Executor的子接口,管理执行异步任务的执行器,AbstractExecutorService提供了默认实现
- AbstractExecutorService.class,ExecutorService的实现类,提供执行方法的默认实现,包括:
- ① submit的几个重载方法,返回Future对象,接收Runnable或者Callable参数
- ② invokeXXX方法,这类方法返回的时候,任务都已结束,即要么全部的入参task都执行完了,要么cancel了
- ThreadPoolExecutor.class,线程池,AbstractExecutorService的子类,除了从AbstractExecutorService继承下来的①、②两类提交任务执行的方法以外,还有:
- ③ 实现自Executor接口的execute方法,接收一个Runnable参数,没有返回值
- RejectedExecutionHandler.class,当任务无法被执行的时候,定义处理逻辑的地方,前面已经提到过了
- ThreadFactory.class,线程工厂,用于创建线程
ScheduledExecutor:
- Delayed.class,延迟执行的接口,只有long getDelay(TimeUnit unit)这样一个接口方法
- ScheduledFuture.class,Delayed和Future的共同子接口
- RunnableScheduledFuture.class,ScheduledFuture和RunnableFuture的共同子接口,增加了一个方法boolean isPeriodic(),返回它是否是一个周期性任务,一个周期性任务的特点在于它可以反复执行
- ScheduledExecutorService.class,ExecutorService的子接口,它允许任务延迟执行,相应地,它返回ScheduledFuture
- ScheduledThreadPoolExecutor.class,可以延迟执行任务的线程池
CompletionService:
- CompletionService.class,它是对ExecutorService的改进,因为ExecutorService只是负责处理任务并把每个任务的结果对象(Future)给你,却并没有说要帮你“管理”这些结果对象,这就意味着你得自己建立一个对象容器存放这些结果对象,很麻烦;CompletionService像是集成了一个Queue的功能,你可以调用Queue一样的方法——poll来获取结果对象,还有一个方法是take,它和poll差不多,区别在于take方法在没有结果对象的时候会返回空,而poll方法会block住线程直到有结果对象返回
- ExecutorCompletionService.class,是CompletionService的实现类
其它:
ThreadLocalRandom.class,随机数生成器,它和Random类差不多,但是它的性能要高得多,因为它的种子内部生成后,就不再修改,而且随机对象不共享,就会减少很多消耗和争用,由于种子内部生成,因此生成随机数的方法略有不同:
ThreadLocalRandom.current().nextX(…)
(完)
出处:http://www.raychase.net/1912
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
- Docker Registry Server 搭建,配置免费HTTPS证书,及拥有权限认证、TLS 的私有仓库
- Ubuntu 17.04 编译安装 Nginx 1.9.9 配置 https 免费证书
- 基于Metronic的Bootstrap开发框架经验总结(2)--列表分页处理和插件JSTree的使用
- Docker Image 解决镜像无法删除的问题
- Docker Hub 仓库使用,及搭建 Docker Registry
- 基于Metronic的Bootstrap开发框架经验总结(3)--下拉列表Select2插件的使用
- Docker 容器操作
- Ubuntu 17.04 x64 安装 Docker CE 初窥 Dockerfile 部署 Nginx
- 基于Metronic的Bootstrap开发框架经验总结(4)--Bootstrap图标的提取和利用
- WebView 的 input 上传照片的兼容问题
- 在 Linux 上搭建Jekyll静态博客
- 基于Metronic的Bootstrap开发框架经验总结(5)--Bootstrap文件上传插件File Input的使用
- 网易严选 App 感受 Weex 开发
- MBR勒索木马再度来袭:GoldenEye分析
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 3. docker-compose实战--ghost app
- 2.1 Kubernetes--Pod
- 3. Kubernetes集群安装
- macOS VirtualBox 桥接模式 设置静态ip 且能和联网
- 重新初始化k8s master节点
- 5.k8s基本命令汇总
- 6. k8s + jenkins 实现持续集成(完)
- 7. 复制k8s Node节点 并重新初始化k8s-nodes2节点 (k8s连载)
- 8.k8s连载--重新生成k8s token(kubeadm join报错及解决)
- 3. dcoker容器的命令
- 4. 镜像的原理
- 5.docker容器数据卷
- 6. Dockerfile详解
- 3.docker搭建一个博客平台
- 4. Docker 私有仓库搭建