AbstractQueuedSynchronizer实现示例

时间:2022-04-25
本文章向大家介绍AbstractQueuedSynchronizer实现示例,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

AbstractQueuedSynchronizer提供了一个实现锁和同步器的框架,它处理很多细节点,比如先进先出等待队列,同时,我们可以自定义同步器的一些标准,比如线程是否执行,是否等待。最常用的ReentrantLock正是通过一个继承AbstractQueuedSynchronizer的内部类来实现的。这些同步器类似于闸门,而线程正是其中的水流,当达到某一标准或限度,闸门打开放水,若不满足条件,则关闭闸门,水流进入等待状态,这就要涉及到状态的管理,AbstractQueuedSynchronizer通过一个state变量来表示状态,getState,?setState,和compareAndSetState方法则操作这一状态,实现类还可以附加一些状态。

在排他模式下(其他线程对其获取将被阻止),AbstractQueuedSynchronizer实现类一般需要重写tryAcquire,tryRelease和isHeldExclusively方法,在共享模式下,需要重写tryAcquireShared和tryReleaseShared方法,在调用acquire,release,acquireShared,releaseShared方法时,将分别调用tryAcquire,tryRelease,tryAcquireShared和tryReleaseShared来判断操作是否可以进行。

下面我通过一个tomcat源码中LimitLatch的一个小片段,让大家对此有一个初步的了解,这是一个在共享模式下的同步器,允许多个线程对它获取成功,因此需要重写tryAcquireShared,tryReleaseShared两个方法。

/**

一个同步工具类,线程可以获取有限次数(limit)的共享闭锁,之后所有后续的请求都会放进FIFO队列等待,直到其中一个闭锁被释放。

*/

public class LimitLatch {
 private class Sync extends AbstractQueuedSynchronizer {
 private static final long serialVersionUID = 1L;
 public Sync() {
  }
 @Override
 protected int tryAcquireShared(int ignored) {
 long newCount = count.incrementAndGet();
 if (!released && newCount > limit) {
 // Limit exceeded
    count.decrementAndGet();
 return -1;
   } else {
 return 1;
   }
  }
 @Override
 protected boolean tryReleaseShared(int arg) {
   count.decrementAndGet();
 return true;
  }
 }
  private final Sync sync;
 private final AtomicLong count;
 private volatile long limit;
 private volatile boolean released = false;
/**
  * Acquires a shared latch if one is available or waits for one if no shared
  * latch is current available.
  */
 public void countUpOrAwait() throws InterruptedException {
 if (log.isDebugEnabled()) {
   log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
  }
  sync.acquireSharedInterruptibly(1);
 }
 /**
  * Releases a shared latch, making it available for another thread to use.
   * @return the previous counter value
  */
 public long countDown() {
  sync.releaseShared(0);
 long result = getCount();
 if (log.isDebugEnabled()) {
   log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
  }
 return result;
 }

省略后面的代码.....

Latch持有三个变量:count、limit、released。countUpOnWait方法表示如果有可用的共享闭锁,则执行获取,如果没有,则等待。它调用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法,该方法与acquireShared方法类似,只是当前线程中断时会抛出InterruptedException异常,提前结束对状态的获取:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
 if (Thread.interrupted())
 throw new InterruptedException();
 if (tryAcquireShared(arg) < 0)
   doAcquireSharedInterruptibly(arg);
 }

acquireSharedInterruptibly方法又调用LimitLatch内部类Sync覆盖的tryAcquireShared方法,tryAcquireShared方法必须返回一个值,决定acquireSharedInterruptibly方法能否执行,当realeased状态为false,且当前连接数目newCount大于限制limit,则返回-1,表示获取闭锁失败,线程进入等待队列,否则返回1,允许线程通过。与此类似,countDown方法调用AbstractQueuedSynchronizer的releaseShared方法:

public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {
   doReleaseShared();
 return true;
  }
 return false;
 }

该方法调用Sync覆盖的tryReleaseShared方法,将当前count数目减1,然后返回ture,表示同步器处于释放状态,等待的线程将再次获取闭锁并且获取成功,因为此时调用tryAcquireShared方法,newCount值不大于limit,将返回1,线程通过。ReentrantLock的实现也是使用了类似方法,有兴趣可以看看这篇文章:

http://www.tuicool.com/articles/R7Fnaa