ConcurrentHashMap源码学习

时间:2022-07-23
本文章向大家介绍ConcurrentHashMap源码学习,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

经过对hashMap的学习,现在我们来学习一下ConcurrentHashMap的机理。我们知道juc包是高并发工具包,按照之前提供的高并发集合,那么ConcurrentHashMap也是用来解决多线程中HashMap的问题。那么高并发中HashMap又有什么问题要让人家特意开发这样一个ConrrentHashMap里?我们知道高并发的本质是将多副本代码的运行,但是数据是公用的,也就是多副本代码多数据的操作要一致。而HashMap也不是内存可见的。那么多线程条件下肯定会有问题。在添加的时候我们知道是通过hashMap是通过key的hash值定位的,如果线程1刚定位到下标,然后线程二也定位到了这个地址然后进行了操作,那么线程1肯定会把线程2的value覆盖掉。除此之外比如在扩容的时候,线程1扩容到一半,需要重新计算下标进行移动的时候发生了阻塞,这时候线程2完成了扩容并将元素移动已经移动了新的地址空间。那么线程1的移动必然会覆盖线程2的移动结果。在jdk1.7中的扩容阶段,会导致死循环,死循环的原因主要是线程1扩容到一半,然后线程2完成了扩容,根据java中数据存储于堆的原则,线程1操作的数据不再是之前的数据。因此从逻辑上说可能会发生死循环。比如线程2移动了,线程1去移动移动的地址可能指向的时候线程2移动的地址。所以会导致死循环,而且还可能会产生数据缺失,缺失的原因是对线程2的覆盖。其基本原因还是因为死循环的产生过程。另外在java8的扩容中,要重新添加元素,如果多线程添加的话,会有一个问题。线程1添加一个元素,容量+1,在判断是否要扩容的时候它挂起了。然后线程2完成了这一步操作,之后线程1再给容量+1,再去判断,也就是两次加1操作的最终结果只是添加1而不是2.所以这样在多线程条件下肯定会有问题。虽然不太明显。但是必然会提高hash冲突的可能性了。

从类结构上看,ConcurrentHashMap的代码还是挺多的。有很多内部类也说明concurrentHashMap的功能比较复杂。当然这些内部类也是专门定制的。从类的属性上看有很多和hashmap类似的属性,但是增加了一些。那么这些增加的又是怎么组织的。还有如何做到高并发情况下数据的一致性?是使用了CAS和volitile吗?带着这些疑问,我们向concurrentHashMap开炮!

 public ConcurrentHashMap() {
     }
    //传入concurrentHashMap的容量
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
如果大于最大值得右移一位,那么就取最大值,否则进行扩展两倍,然后再求其最接近的2的次幂数据
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
     默认为16
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
设置一个最小的容量
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
设置容量
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
可以看出初始化的过程和hashMap没啥两样。都是对限制条件的完善。
再添加元素的时候
   public V put(K key, V value) {
        return putVal(key, value, false);
    }
static final int HASH_BITS = 0x7fffffff
//这里的异或运算主要也是为了找到对应的index,因为高位在的都在,地位不一定。所以区分度还是看传入的数据。
static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }




/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) 
    //concurrentHashMap的key和value都不允许为空
     throw new NullPointerException();
计算hash值,这块与hashmap的好像不一样。
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
    初始化tab
                tab = initTable();
这里的tabAt是我们探索concurrentHashMap的结构的基础。
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
找到hash对应的index的数据,这块是CasTabAt(index),大概得意思是使用cas策略来更新这个元素,我们看到concurrentHashMap中好多数据都是用volitile修饰的。
                if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
设置成功之后,直接中断
            }
//意思是数组中没有找到
使用fh作为hash值得缓存
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
这里加锁了,线程安全。可以看出这里的锁是加载链表上的
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
   查看两次取到的值是否一样
                        if (fh >= 0) {
                            binCount = 1;
f是节点,遍历f的链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
如果已经存在的话就覆盖
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
否则的话用链表的方式添加到末尾
                                    pred.next = new Node<K,V>(hash, key,value, null);
                                    break;
                                }
                            }
                        }
//如果他的hash值为负数,那么判断是不是tree
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
更新红黑树的值
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
如果链表的长度大于8的话就进行转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

在tabAt方法中,我们看到了熟悉的Unsafe,这块是把下标和Node数组的首地址的偏移量进行计算正式的地址。

@SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

这里的AddCount是做什么的?按理需要处理的也做完了。看字面意思应该就是怼binCount进行+1,但是又好像不是这么回事。再说这样有什么意义。但是bincount肯定是链表定的长度了。

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
这里check是链表长度
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
sc是扩容上线
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {
  新的容量
                int rs = resizeStamp(n);
                if (sc < 0) {
???
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
容量进行+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
扩容
                        transfer(tab, nt);
                }
扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);

    获取总容量
                s = sumCount();
            }
        }
    }

addCount的大概就是扩容了。这块逻辑上看还是挺复杂的

在获取元素的时候。

 public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
先定位到tab,然后通过tab的hash计算到具体的链表,如果在tab这里找到的话,就直接返回。
        if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
    如果其hash值小于0,说明是红黑树哦,那么通过红黑树进行查找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
否则遍历链表
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

综合上述:concurrentHashMap是基于分段锁实现的,其中加锁的地方在链表和树节点。如果分三层去看就是将锁添加到最后一层。这样做也是为了让添加节点没有后顾之忧。其他部分与hashMap差异不大。