读源码——Guava-Cache

时间:2022-07-23
本文章向大家介绍读源码——Guava-Cache,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

今天,听同事介绍了Cuava-cache,这是个老牌缓存了,虽然近来被Caffine的出现遮盖了风头,但依然不能掩盖它往日的辉煌,至少在我们团队,还有很多项目在使用它,索性就以它为基础,对缓存做一次总结。

名词解释

  • SoR(System-of-Record):记录系统,也就是我们的数据源,实际存储原始数据的查询速度较慢的一些介质。
  • 回源:即回到数据源头获取数据,缓存没有命中,需要去SoR读取数据,可以理解回源次数太多造成压力就是“缓存击穿”了。
  • 缓存穿透:是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。解决方案是:空值缓存、布隆过滤器或者布谷鸟过滤器;
  • 缓存击穿:是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力。解决方案是:加锁;
  • 缓存雪崩:是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库;

Java缓存类型

  1. 堆缓存:使用java堆来缓存对象,优点是速度很快,缺点也很明显,当缓存数据量很大时,必然会给jvm的GC带来很大压力。一般可以通过设置软引用和弱引用来存储缓存对象。堆缓存一般用于存储较热的数据。常见的实现有Cuava Cache、Ehcache 3.x、MapDB;
  2. 堆外缓存:即缓存数据存储在jvm堆外的内存中;优点是:降低了GC压力,缺点是:每次读取数据都需要对数据序列化和反序列化,速度降低了不少,可以使用Ehcache 3.x、MapDB实现;
  3. 磁盘缓存:即缓存数据存储在磁盘上,在JVM重启的时候数据也还是在的,而堆缓存/堆外缓存的数据会丢失,需要重新加载。可以使用Encache、MapDB实现;
  4. 分布式缓存:以上几个缓存都存在多实例情况下数据不一致和单机容量的问题。

回收算法

  1. FIFO (First In First Out):先进先出算法,即先放入缓存的先被移除。
  2. LRU (Least Recently Used):最近最少使用算法,使用时间距离现在最久的那个被移除。
  3. LFU (Least Frequently Used):最不常用算法,一定时间段内使用次数(频率)最少的那个被移除。

缓存使用模式

  1. Cache-Aside 即业务代码围绕缓存来写,由业务代码直接维护缓存;
  2. Cache-As-SoR 即把Cache看做 SoR,所有操作都是对Cache进行,然后Cache再委托给SoR进行真实的读/写;即业务代码中只看到Cache的操作,看不到关于SOR相关的代码。有三种实现:

① read-through:代码首先调用Cache,如果Cache不命中由Cache回源到SoR,而不是业务代码(即由Cache 读SoR)。使用Read-Through 模式,需要配置一个CacheLoader组件用来回源到SoR加载源数据.Guava Cache和Ehcache 3.x都支持该模式,下面会有实现;

② write-through:被称为穿透写模式/直写模式——代码首先调用Cache 写(新增/修改)数据,然后由Cache负责写缓存和写SoR,而不是由业务代码。使用Write-Through模式需要配置--个CacheWriter组件用来回写SoR。GuavaCache没有提供支持。Ehcache3.x支持该模式。

③ write-behind:也叫Write-Back,我们称之为回写模式。不同于Write-Through是同步写SOR和Cache,Write-Behind是异步写。异步写之后可以实现批量写、合并写、延时和限流。

Guava-Cache简单实现

public class CacheTest {
    static AtomicInteger ac = new AtomicInteger(1);
    public static LoadingCache<String, String> testCache = CacheBuilder.newBuilder()
            .maximumSize(30000)//设置最大数量
            .expireAfterAccess(1, TimeUnit.MINUTES)//设置过期时间
            .concurrencyLevel(8)//设置并发级别
            .recordStats()
            .weakKeys()//key value均设置为弱引用
            .weakValues()
            .refreshAfterWrite(30, TimeUnit.SECONDS)//刷新时间
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String s) throws Exception {
                    return s + ac.getAndIncrement();
                }
            });

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(testCache.get("test"));
        System.out.println(testCache.get("test"));
        TimeUnit.SECONDS.sleep(30);
        System.out.println(testCache.get("test"));
        TimeUnit.SECONDS.sleep(30);
        System.out.println(testCache.get("test"));
    }
}

运行结果

"C:Program FilesJavajdk1.8.0_221binjava.exe"...
test1
test1
test2
test3

Process finished with exit code 0

源码解读(基于实现)

CacheBuilder基于建造者模式完成对LocalCache.LocalLoadingCache的构建。LocalCache继承于ConcurrentMap,散列表由Segment[]数组作为主体,采用AtomicReferenceArray完成Hash碰撞时候的扩展,是线程安全的。然后重点看一下它的get实现。调用栈为:

testCache.get("test")
=>LocalCache#LocalLoadingCache#get(K key)
=>LocalCache#getOrLoad(K key)
=>LocalCache#get(K key, CacheLoader<? super K, V> loader)//获取对应的segement
=>Segement#get(K key, int hash, CacheLoader<? super K, V> loader)
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
  checkNotNull(key);
  checkNotNull(loader);
  try {
    if (count != 0) { // 判断当前segement下有没有值
      ReferenceEntry<K, V> e = getEntry(key, hash);// 如果有值,就循环获取;
      if (e != null) {
        long now = map.ticker.read();//获取系统时间
        V value = getLiveValue(e, now);//判断是否过期
        if (value != null) {
          recordRead(e, now);// 更新当前的访问时间
          statsCounter.recordHits(1);//命中计数
          return scheduleRefresh(e, key, hash, value, now, loader);//刷新
        }
        ValueReference<K, V> valueReference = e.getValueReference();
        if (valueReference.isLoading()) {//如果是其他线程正在加载,就等待再返回
          return waitForLoadingValue(e, key, valueReference);
        }
      }
    }
    return lockedGetOrLoad(key, hash, loader);//重新加载
  } catch (ExecutionException ee) {
    Throwable cause = ee.getCause();
    if (cause instanceof Error) {
      throw new ExecutionError((Error) cause);
    } else if (cause instanceof RuntimeException) {
      throw new UncheckedExecutionException(cause);
    }
    throw ee;
  } finally {
    postReadCleanup();
  }
}

这个方法主要用于处理缓存值得过期和刷新。这涉及到三个参数:

  • expireAfterAccess: 当缓存项在指定的时间段内没有被读或写就会被回收。
  • expireAfterWrite:当缓存项在指定的时间段内没有更新就会被回收。
  • refreshAfterWrite:当缓存项上一次更新操作之后的多久会被刷新。

优化思路:

expireAfterAccess失效性太差,如果一直存在读或者写的话,缓存可能永远不会被更新。而expireAfterWrite则通过一个加锁的方式,只允许一个线程去回源,有效防止了缓存击穿。但是,可以预见的是,而且当一个线程在回源的时候,其他请求同样key的线程一部分处于一个阻塞等待的过程(waitForLoadingValue),一部分在双重加锁处等待,可以说有一些性能损耗;如果使用refreshAfterWrite,缓存值会通过scheduleRefresh(e, key, hash, value, now, loader)加载,同样保证了只有一个缓存能进入,其他缓存没有阻塞,而是使用原值。这样虽然保证了性能,但是如果某个key吞吐量低,它使用到的旧值很可能是很久之前的,不大友好。通过源码,可以看到,如果同时使用expireAfterWrite和refreshAfterWrite的话,refreshAfterWrite<expireAfterWrite,这样当最先触发refreshAfterWrite的时候,采用刷新机制,不至于带来大氛围线程阻塞,当再触发expireAfterWrite的时候,没有来得及刷新的会被置位过期(刷新会重置writeTIme)。这就有点redis惰性删除和主动删除配合的意思了,另外在软引用和弱引用的使用,可以把缓存调整到最佳状态。软引用和弱引用

 V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
  ReferenceEntry<K, V> e;
  ValueReference<K, V> valueReference = null;
  LoadingValueReference<K, V> loadingValueReference = null;
  boolean createNewEntry = true;

  lock();
  try {
    // re-read ticker once inside the lock
    long now = map.ticker.read();
    preWriteCleanup(now);

    int newCount = this.count - 1;
    AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
    int index = hash & (table.length() - 1);
    ReferenceEntry<K, V> first = table.get(index);

    for (e = first; e != null; e = e.getNext()) {
      K entryKey = e.getKey();
      if (e.getHash() == hash
          && entryKey != null
          && map.keyEquivalence.equivalent(key, entryKey)) {
        valueReference = e.getValueReference();
        if (valueReference.isLoading()) {
          createNewEntry = false;
        } else {
          V value = valueReference.get();
          if (value == null) {
            enqueueNotification(
                entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
          } else if (map.isExpired(e, now)) {
            enqueueNotification(
                entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
          } else {
            recordLockedRead(e, now);
            statsCounter.recordHits(1);
            return value;
          }
          writeQueue.remove(e);//写队列
          accessQueue.remove(e);//读队列
          this.count = newCount; // write-volatile
        }
        break;
      }
    }

    if (createNewEntry) {
      loadingValueReference = new LoadingValueReference<K, V>();

      if (e == null) {
        e = newEntry(key, hash, first);
        e.setValueReference(loadingValueReference);
        table.set(index, e);
      } else {
        e.setValueReference(loadingValueReference);
      }
    }
  } finally {
    unlock();
    postWriteCleanup();
  }

  if (createNewEntry) {
    try {
      synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
      }
    } finally {
      statsCounter.recordMisses(1);
    }
  } else {
    return waitForLoadingValue(e, key, valueReference);
  }
}

这是Guava-Cache实现的加载类,采用ReentrantLock完成加锁,使整个map实现一个分段锁的结构。另外,整体对future的使用灰常值得借鉴。

以上。。。