聊聊java中的哪些Map:(六)ConcurrentHashMap源码分析

时间:2022-07-24
本文章向大家介绍聊聊java中的哪些Map:(六)ConcurrentHashMap源码分析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

在聊完HashTable和HashMap的区别之后,自然该到了聊聊ConcurrentHashMap的时间了。HashTable逐渐被废弃,就是因为ConcurrentHashMap的出现。可以想象HashMap做为一个高频使用的集合框架。如果每次使用过程中都将整个方法synchronized,这样意味着加了全局的锁。势必会导致在并发情况下的低效。因此ConcurrentHashMap的出现,改变了这种情况。

1.类结构及其成员变量

1.1 类的基本结构

ConcurrentHashMap的类结构图如下:

可以看到ConcurrentHashMap主要继承了AbstractMap并实现ConcurrentMap和Serializable接口。

/**
 * A hash table supporting full concurrency of retrievals and
 * high expected concurrency for updates. This class obeys the
 * same functional specification as {@link java.util.Hashtable}, and
 * includes versions of methods corresponding to each method of
 * {@code Hashtable}. However, even though all operations are
 * thread-safe, retrieval operations do <em>not</em> entail locking,
 * and there is <em>not</em> any support for locking the entire table
 * in a way that prevents all access.  This class is fully
 * interoperable with {@code Hashtable} in programs that rely on its
 * thread safety but not on its synchronization details.
 *
 * <p>Retrieval operations (including {@code get}) generally do not
 * block, so may overlap with update operations (including {@code put}
 * and {@code remove}). Retrievals reflect the results of the most
 * recently <em>completed</em> update operations holding upon their
 * onset. (More formally, an update operation for a given key bears a
 * <em>happens-before</em> relation with any (non-null) retrieval for
 * that key reporting the updated value.)  For aggregate operations
 * such as {@code putAll} and {@code clear}, concurrent retrievals may
 * reflect insertion or removal of only some entries.  Similarly,
 * Iterators, Spliterators and Enumerations return elements reflecting the
 * state of the hash table at some point at or since the creation of the
 * iterator/enumeration.  They do <em>not</em> throw {@link
 * java.util.ConcurrentModificationException ConcurrentModificationException}.
 * However, iterators are designed to be used by only one thread at a time.
 * Bear in mind that the results of aggregate status methods including
 * {@code size}, {@code isEmpty}, and {@code containsValue} are typically
 * useful only when a map is not undergoing concurrent updates in other threads.
 * Otherwise the results of these methods reflect transient states
 * that may be adequate for monitoring or estimation purposes, but not
 * for program control.
 *
 * <p>The table is dynamically expanded when there are too many
 * collisions (i.e., keys that have distinct hash codes but fall into
 * the same slot modulo the table size), with the expected average
 * effect of maintaining roughly two bins per mapping (corresponding
 * to a 0.75 load factor threshold for resizing). There may be much
 * variance around this average as mappings are added and removed, but
 * overall, this maintains a commonly accepted time/space tradeoff for
 * hash tables.  However, resizing this or any other kind of hash
 * table may be a relatively slow operation. When possible, it is a
 * good idea to provide a size estimate as an optional {@code
 * initialCapacity} constructor argument. An additional optional
 * {@code loadFactor} constructor argument provides a further means of
 * customizing initial table capacity by specifying the table density
 * to be used in calculating the amount of space to allocate for the
 * given number of elements.  Also, for compatibility with previous
 * versions of this class, constructors may optionally specify an
 * expected {@code concurrencyLevel} as an additional hint for
 * internal sizing.  Note that using many keys with exactly the same
 * {@code hashCode()} is a sure way to slow down performance of any
 * hash table. To ameliorate impact, when keys are {@link Comparable},
 * this class may use comparison order among keys to help break ties.
 *
 * <p>A {@link Set} projection of a ConcurrentHashMap may be created
 * (using {@link #newKeySet()} or {@link #newKeySet(int)}), or viewed
 * (using {@link #keySet(Object)} when only keys are of interest, and the
 * mapped values are (perhaps transiently) not used or all take the
 * same mapping value.
 *
 * <p>A ConcurrentHashMap can be used as scalable frequency map (a
 * form of histogram or multiset) by using {@link
 * java.util.concurrent.atomic.LongAdder} values and initializing via
 * {@link #computeIfAbsent computeIfAbsent}. For example, to add a count
 * to a {@code ConcurrentHashMap<String,LongAdder> freqs}, you can use
 * {@code freqs.computeIfAbsent(k -> new LongAdder()).increment();}
 *
 * <p>This class and its views and iterators implement all of the
 * <em>optional</em> methods of the {@link Map} and {@link Iterator}
 * interfaces.
 *
 * <p>Like {@link Hashtable} but unlike {@link HashMap}, this class
 * does <em>not</em> allow {@code null} to be used as a key or value.
 *
 * <p>ConcurrentHashMaps support a set of sequential and parallel bulk
 * operations that, unlike most {@link Stream} methods, are designed
 * to be safely, and often sensibly, applied even with maps that are
 * being concurrently updated by other threads; for example, when
 * computing a snapshot summary of the values in a shared registry.
 * There are three kinds of operation, each with four forms, accepting
 * functions with Keys, Values, Entries, and (Key, Value) arguments
 * and/or return values. Because the elements of a ConcurrentHashMap
 * are not ordered in any particular way, and may be processed in
 * different orders in different parallel executions, the correctness
 * of supplied functions should not depend on any ordering, or on any
 * other objects or values that may transiently change while
 * computation is in progress; and except for forEach actions, should
 * ideally be side-effect-free. Bulk operations on {@link java.util.Map.Entry}
 * objects do not support method {@code setValue}.
 *
 * <ul>
 * <li> forEach: Perform a given action on each element.
 * A variant form applies a given transformation on each element
 * before performing the action.</li>
 *
 * <li> search: Return the first available non-null result of
 * applying a given function on each element; skipping further
 * search when a result is found.</li>
 *
 * <li> reduce: Accumulate each element.  The supplied reduction
 * function cannot rely on ordering (more formally, it should be
 * both associative and commutative).  There are five variants:
 *
 * <ul>
 *
 * <li> Plain reductions. (There is not a form of this method for
 * (key, value) function arguments since there is no corresponding
 * return type.)</li>
 *
 * <li> Mapped reductions that accumulate the results of a given
 * function applied to each element.</li>
 *
 * <li> Reductions to scalar doubles, longs, and ints, using a
 * given basis value.</li>
 *
 * </ul>
 * </li>
 * </ul>
 *
 * <p>These bulk operations accept a {@code parallelismThreshold}
 * argument. Methods proceed sequentially if the current map size is
 * estimated to be less than the given threshold. Using a value of
 * {@code Long.MAX_VALUE} suppresses all parallelism.  Using a value
 * of {@code 1} results in maximal parallelism by partitioning into
 * enough subtasks to fully utilize the {@link
 * ForkJoinPool#commonPool()} that is used for all parallel
 * computations. Normally, you would initially choose one of these
 * extreme values, and then measure performance of using in-between
 * values that trade off overhead versus throughput.
 *
 * <p>The concurrency properties of bulk operations follow
 * from those of ConcurrentHashMap: Any non-null result returned
 * from {@code get(key)} and related access methods bears a
 * happens-before relation with the associated insertion or
 * update.  The result of any bulk operation reflects the
 * composition of these per-element relations (but is not
 * necessarily atomic with respect to the map as a whole unless it
 * is somehow known to be quiescent).  Conversely, because keys
 * and values in the map are never null, null serves as a reliable
 * atomic indicator of the current lack of any result.  To
 * maintain this property, null serves as an implicit basis for
 * all non-scalar reduction operations. For the double, long, and
 * int versions, the basis should be one that, when combined with
 * any other value, returns that other value (more formally, it
 * should be the identity element for the reduction). Most common
 * reductions have these properties; for example, computing a sum
 * with basis 0 or a minimum with basis MAX_VALUE.
 *
 * <p>Search and transformation functions provided as arguments
 * should similarly return null to indicate the lack of any result
 * (in which case it is not used). In the case of mapped
 * reductions, this also enables transformations to serve as
 * filters, returning null (or, in the case of primitive
 * specializations, the identity basis) if the element should not
 * be combined. You can create compound transformations and
 * filterings by composing them yourself under this "null means
 * there is nothing there now" rule before using them in search or
 * reduce operations.
 *
 * <p>Methods accepting and/or returning Entry arguments maintain
 * key-value associations. They may be useful for example when
 * finding the key for the greatest value. Note that "plain" Entry
 * arguments can be supplied using {@code new
 * AbstractMap.SimpleEntry(k,v)}.
 *
 * <p>Bulk operations may complete abruptly, throwing an
 * exception encountered in the application of a supplied
 * function. Bear in mind when handling such exceptions that other
 * concurrently executing functions could also have thrown
 * exceptions, or would have done so if the first exception had
 * not occurred.
 *
 * <p>Speedups for parallel compared to sequential forms are common
 * but not guaranteed.  Parallel operations involving brief functions
 * on small maps may execute more slowly than sequential forms if the
 * underlying work to parallelize the computation is more expensive
 * than the computation itself.  Similarly, parallelization may not
 * lead to much actual parallelism if all processors are busy
 * performing unrelated tasks.
 *
 * <p>All arguments to all task methods must be non-null.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <K> the type of keys maintained by this map
 * @param <V> the type of mapped values
 */
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    private static final long serialVersionUID = 7249069246763182397L;
}

我们来看看在类前面的大段注释,这个非常关键。实际上ConcurrentHashMap作者在写这个类的时候一些考量都已经放到这个地方了。 可以看到,其大意为ConcurrentHashMap是一个支持完全并发检索和高预期并发更新的哈希表。这个类遵循与HashTable相同的函数规范。并包含HashTable每个方法相对应的版本。然而,即使所有的操作都是线程安全的,但是检索操作实际上并不需要锁,这个不的意思是防止访问锁定整个表。这与采用synchronized实现的HashTable有着本质的不同。 检索操作,包含get等通常都不需要加锁。所以可能与更新操作put、remove重合。更正式的说,前一个更新操作一定对后一个操作可见。对于聚合操作putAll和clear,并发检索可能只是插入或者删除部分条目。同样的,在Iterator和Spliterator,以及枚举返回的元素反映了hash表在创建迭代器/枚举器时或者之后某个时刻的状态。他们都不会抛出ConcurrentModificationException,然而,迭代器Iterator通常被设置成为了单线程操作。聚合状态的结果方法包括,size、isEmpty、containsValue,通常是只有当map没有其他线程中进行并发更新时才有用。否则这些方法的结果反映的是瞬时状态,这对于监测或者估算可能足够,但是不能用于程序控制。 当碰撞出现得比较多的时候,会动态的扩展hashtable。比如多个key的hash取模之后落入到相同的bucket上。预期的平均效果是在每个映射保持大约两个容器,对应调整大小为0.75的负载因子。一般情况下,在添加和删除操作上会有很大的差异,但是总的来说,这会将哈希表在一个普片能接受的时间和空间开销上进行权衡。 然而,通过resize方法进行扩容是一个非常耗时的操作,如果可能,最好事先估算哈希表的大小,通过初始化容量和负载因子的构造函数进行初始化。这提供了进一步控制容量的方法。另外,为了与旧版本的类兼容,构造函数还可以选择指定期望的concurrencyLevel做为附加的内部分级提示。注意,使用许多key与完全相同的hashCode肯定会降低哈希表的性能。为了改善影响,当key为Comparable时,这个类可以使用key之间的比较顺序来帮助断开连接。 可以通过newkeySet或者newkeySet(int)来创建一个ConcurrentHashMap的Set投影。当只对key感兴趣的时候使用keyset。 ConcurrentHashMap通过使用LongAdder并且通过computeIfAbsent初始化可以作为一个可缩放频率的图(直方图或者多重集的形式)。例如,可以使用freqs.computeIfAbsent(k->new LongAdder()).increment();向ConcurrentHashMap<String,LongAdder> freqs添加一个计数。 这个类实现了Map和Iterator接口的全部方法。 与Hashtable相同的是,ConcurrenthashMap并不支持空的key和value,这与HashMap不同。 ConcurrentHashMap支持一组顺序的和并行的批量操作,与大多数Stream方法不同,这些操作被设计成安全的,而且通常是sensibly。即使在被其他线程同时更新的map上也能使用。例如,在计算共享的hash表中的值的快照摘要的时候,有三种类型的操作,每一种都有四种形式,接受带有key、value、entry或带有返回值的函数。因为ConcurrentHashMap不以任何特殊方式排序,在并行操作的过程中可能有不同的执行顺序,正确性提供的函数不应该依赖于任何排序,或者任何其他对象或者值可能暂时改变的正在进行的计算,除了forEach操作之外,应该最好是没有任何副作用。对Entry的批量操作对象不支持setValue方法。 forEarch对每个元素执行给定的操作,变体形式对每个元素应在执行之前给定转换。 search返回对每个元素应用给定函数的第一个可用的非空结果。当找到结果时,跳过进一步的搜索。 reduce:对每个元素进行累加,提供的deduce函数不能依赖于排序,更正确的说,它应该是组合和交换的。存在5种变体。 Plain reductions:由于没有相应的返回类型,因此(key,value)没有此方法的返回形式。 Mapped reductions:累加每个元素给定函数的结果。 Reductions 支持double、int以及给定的基本类型。 bulk操作允许parallelismThreshold参数。如果估算当前的map的size小于给定的阈值。使用Long.MAX_VALUE来限制所有的并发。使用1的值可以通过将足够多的子任务划分为充分利用所有并行计算的ForkJoinPool#commonPool()来实现最大的并行性。通常,一开始你会选择其中一个极端的值,然后使用中间值对开销和吞吐量进行性能测算。 bulk批量操作的并发属性继承了ConcurrentHashMap的并发属性,从get和相关访问的方法返回任何非空结果都会在与相关的插入或更新相关联的关系之前发生。 任何批量操作的结果都反映了这些元素关系的组成,但是对于map整体而言,不一定是原子的,除非知道它是静止的,相反,由于map中的key和value从不为空,可以做为当前缺少任何结果的原子指示器。 为了维护这个属性,null做为所有非标量缩减操作的隐式基础,对于double、int、long版本,其基础应该是与任何其他值组合时返回的其他值,更正式的说,它应该是用于减少的标识元素。大多数常见的reduce都有这些性质。例如,用0为基数计算和,或者用最大值MAX_VALIE计算最小值。 做为参数提供的搜索和转换函数同样应该返回null,以表示缺少任何结果,在这种情况下,不使用它。在map-reduce的情况下,这也使得转换能够充当筛选器。如果不应组合元素,则返回null。你可以创建复合的转换和筛选,方法是搜索或者reduce操作中使用他们之前,根据null表示现在没有任何内容的规则来自己组合使用。 方法接受或者返回Entry(key-value)参数,他们可能是有用的。丽日,当找到最大值的key,注意Entry条目可以使用new AbstractMap.SimpleEntry(k、v)。 bulk操作可能会突然完成,引发在应用提供函数时遇到的异常。在处理其他并发执行函数时也可能引起异常。请记住需要处理这些异常,如果第一个异常没有发生就应该这样做。 与顺序形式相比,并行形式的加速很常见,但是并不保证。如果并行计算的底层工作比计算本身更昂贵。则涉及小映射上的简短函数的并行操作可能比顺序操作执行得慢。类似的,如果所有处理器都忙于执行不相干的任务,并行化可能不会带来太多实际的并行性。 ConcurrentHashMap所有的方法的参数都是要求非空的。这个类也是java集合框架的成员之一。 在类的内部,还有一大段注释:

/*
 * Overview:
 *
 * The primary design goal of this hash table is to maintain
 * concurrent readability (typically method get(), but also
 * iterators and related methods) while minimizing update
 * contention. Secondary goals are to keep space consumption about
 * the same or better than java.util.HashMap, and to support high
 * initial insertion rates on an empty table by many threads.
 *
 * This map usually acts as a binned (bucketed) hash table.  Each
 * key-value mapping is held in a Node.  Most nodes are instances
 * of the basic Node class with hash, key, value, and next
 * fields. However, various subclasses exist: TreeNodes are
 * arranged in balanced trees, not lists.  TreeBins hold the roots
 * of sets of TreeNodes. ForwardingNodes are placed at the heads
 * of bins during resizing. ReservationNodes are used as
 * placeholders while establishing values in computeIfAbsent and
 * related methods.  The types TreeBin, ForwardingNode, and
 * ReservationNode do not hold normal user keys, values, or
 * hashes, and are readily distinguishable during search etc
 * because they have negative hash fields and null key and value
 * fields. (These special nodes are either uncommon or transient,
 * so the impact of carrying around some unused fields is
 * insignificant.)
 *
 * The table is lazily initialized to a power-of-two size upon the
 * first insertion.  Each bin in the table normally contains a
 * list of Nodes (most often, the list has only zero or one Node).
 * Table accesses require volatile/atomic reads, writes, and
 * CASes.  Because there is no other way to arrange this without
 * adding further indirections, we use intrinsics
 * (sun.misc.Unsafe) operations.
 *
 * We use the top (sign) bit of Node hash fields for control
 * purposes -- it is available anyway because of addressing
 * constraints.  Nodes with negative hash fields are specially
 * handled or ignored in map methods.
 *
 * Insertion (via put or its variants) of the first node in an
 * empty bin is performed by just CASing it to the bin.  This is
 * by far the most common case for put operations under most
 * key/hash distributions.  Other update operations (insert,
 * delete, and replace) require locks.  We do not want to waste
 * the space required to associate a distinct lock object with
 * each bin, so instead use the first node of a bin list itself as
 * a lock. Locking support for these locks relies on builtin
 * "synchronized" monitors.
 *
 * Using the first node of a list as a lock does not by itself
 * suffice though: When a node is locked, any update must first
 * validate that it is still the first node after locking it, and
 * retry if not. Because new nodes are always appended to lists,
 * once a node is first in a bin, it remains first until deleted
 * or the bin becomes invalidated (upon resizing).
 *
 * The main disadvantage of per-bin locks is that other update
 * operations on other nodes in a bin list protected by the same
 * lock can stall, for example when user equals() or mapping
 * functions take a long time.  However, statistically, under
 * random hash codes, this is not a common problem.  Ideally, the
 * frequency of nodes in bins follows a Poisson distribution
 * (http://en.wikipedia.org/wiki/Poisson_distribution) with a
 * parameter of about 0.5 on average, given the resizing threshold
 * of 0.75, although with a large variance because of resizing
 * granularity. Ignoring variance, the expected occurrences of
 * list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The
 * first values are:
 *
 * 0:    0.60653066
 * 1:    0.30326533
 * 2:    0.07581633
 * 3:    0.01263606
 * 4:    0.00157952
 * 5:    0.00015795
 * 6:    0.00001316
 * 7:    0.00000094
 * 8:    0.00000006
 * more: less than 1 in ten million
 *
 * Lock contention probability for two threads accessing distinct
 * elements is roughly 1 / (8 * #elements) under random hashes.
 *
 * Actual hash code distributions encountered in practice
 * sometimes deviate significantly from uniform randomness.  This
 * includes the case when N > (1<<30), so some keys MUST collide.
 * Similarly for dumb or hostile usages in which multiple keys are
 * designed to have identical hash codes or ones that differs only
 * in masked-out high bits. So we use a secondary strategy that
 * applies when the number of nodes in a bin exceeds a
 * threshold. These TreeBins use a balanced tree to hold nodes (a
 * specialized form of red-black trees), bounding search time to
 * O(log N).  Each search step in a TreeBin is at least twice as
 * slow as in a regular list, but given that N cannot exceed
 * (1<<64) (before running out of addresses) this bounds search
 * steps, lock hold times, etc, to reasonable constants (roughly
 * 100 nodes inspected per operation worst case) so long as keys
 * are Comparable (which is very common -- String, Long, etc).
 * TreeBin nodes (TreeNodes) also maintain the same "next"
 * traversal pointers as regular nodes, so can be traversed in
 * iterators in the same way.
 *
 * The table is resized when occupancy exceeds a percentage
 * threshold (nominally, 0.75, but see below).  Any thread
 * noticing an overfull bin may assist in resizing after the
 * initiating thread allocates and sets up the replacement array.
 * However, rather than stalling, these other threads may proceed
 * with insertions etc.  The use of TreeBins shields us from the
 * worst case effects of overfilling while resizes are in
 * progress.  Resizing proceeds by transferring bins, one by one,
 * from the table to the next table. However, threads claim small
 * blocks of indices to transfer (via field transferIndex) before
 * doing so, reducing contention.  A generation stamp in field
 * sizeCtl ensures that resizings do not overlap. Because we are
 * using power-of-two expansion, the elements from each bin must
 * either stay at same index, or move with a power of two
 * offset. We eliminate unnecessary node creation by catching
 * cases where old nodes can be reused because their next fields
 * won't change.  On average, only about one-sixth of them need
 * cloning when a table doubles. The nodes they replace will be
 * garbage collectable as soon as they are no longer referenced by
 * any reader thread that may be in the midst of concurrently
 * traversing table.  Upon transfer, the old table bin contains
 * only a special forwarding node (with hash field "MOVED") that
 * contains the next table as its key. On encountering a
 * forwarding node, access and update operations restart, using
 * the new table.
 *
 * Each bin transfer requires its bin lock, which can stall
 * waiting for locks while resizing. However, because other
 * threads can join in and help resize rather than contend for
 * locks, average aggregate waits become shorter as resizing
 * progresses.  The transfer operation must also ensure that all
 * accessible bins in both the old and new table are usable by any
 * traversal.  This is arranged in part by proceeding from the
 * last bin (table.length - 1) up towards the first.  Upon seeing
 * a forwarding node, traversals (see class Traverser) arrange to
 * move to the new table without revisiting nodes.  To ensure that
 * no intervening nodes are skipped even when moved out of order,
 * a stack (see class TableStack) is created on first encounter of
 * a forwarding node during a traversal, to maintain its place if
 * later processing the current table. The need for these
 * save/restore mechanics is relatively rare, but when one
 * forwarding node is encountered, typically many more will be.
 * So Traversers use a simple caching scheme to avoid creating so
 * many new TableStack nodes. (Thanks to Peter Levart for
 * suggesting use of a stack here.)
 *
 * The traversal scheme also applies to partial traversals of
 * ranges of bins (via an alternate Traverser constructor)
 * to support partitioned aggregate operations.  Also, read-only
 * operations give up if ever forwarded to a null table, which
 * provides support for shutdown-style clearing, which is also not
 * currently implemented.
 *
 * Lazy table initialization minimizes footprint until first use,
 * and also avoids resizings when the first operation is from a
 * putAll, constructor with map argument, or deserialization.
 * These cases attempt to override the initial capacity settings,
 * but harmlessly fail to take effect in cases of races.
 *
 * The element count is maintained using a specialization of
 * LongAdder. We need to incorporate a specialization rather than
 * just use a LongAdder in order to access implicit
 * contention-sensing that leads to creation of multiple
 * CounterCells.  The counter mechanics avoid contention on
 * updates but can encounter cache thrashing if read too
 * frequently during concurrent access. To avoid reading so often,
 * resizing under contention is attempted only upon adding to a
 * bin already holding two or more nodes. Under uniform hash
 * distributions, the probability of this occurring at threshold
 * is around 13%, meaning that only about 1 in 8 puts check
 * threshold (and after resizing, many fewer do so).
 *
 * TreeBins use a special form of comparison for search and
 * related operations (which is the main reason we cannot use
 * existing collections such as TreeMaps). TreeBins contain
 * Comparable elements, but may contain others, as well as
 * elements that are Comparable but not necessarily Comparable for
 * the same T, so we cannot invoke compareTo among them. To handle
 * this, the tree is ordered primarily by hash value, then by
 * Comparable.compareTo order if applicable.  On lookup at a node,
 * if elements are not comparable or compare as 0 then both left
 * and right children may need to be searched in the case of tied
 * hash values. (This corresponds to the full list search that
 * would be necessary if all elements were non-Comparable and had
 * tied hashes.) On insertion, to keep a total ordering (or as
 * close as is required here) across rebalancings, we compare
 * classes and identityHashCodes as tie-breakers. The red-black
 * balancing code is updated from pre-jdk-collections
 * (http://gee.cs.oswego.edu/dl/classes/collections/RBCell.java)
 * based in turn on Cormen, Leiserson, and Rivest "Introduction to
 * Algorithms" (CLR).
 *
 * TreeBins also require an additional locking mechanism.  While
 * list traversal is always possible by readers even during
 * updates, tree traversal is not, mainly because of tree-rotations
 * that may change the root node and/or its linkages.  TreeBins
 * include a simple read-write lock mechanism parasitic on the
 * main bin-synchronization strategy: Structural adjustments
 * associated with an insertion or removal are already bin-locked
 * (and so cannot conflict with other writers) but must wait for
 * ongoing readers to finish. Since there can be only one such
 * waiter, we use a simple scheme using a single "waiter" field to
 * block writers.  However, readers need never block.  If the root
 * lock is held, they proceed along the slow traversal path (via
 * next-pointers) until the lock becomes available or the list is
 * exhausted, whichever comes first. These cases are not fast, but
 * maximize aggregate expected throughput.
 *
 * Maintaining API and serialization compatibility with previous
 * versions of this class introduces several oddities. Mainly: We
 * leave untouched but unused constructor arguments refering to
 * concurrencyLevel. We accept a loadFactor constructor argument,
 * but apply it only to initial table capacity (which is the only
 * time that we can guarantee to honor it.) We also declare an
 * unused "Segment" class that is instantiated in minimal form
 * only when serializing.
 *
 * Also, solely for compatibility with previous versions of this
 * class, it extends AbstractMap, even though all of its methods
 * are overridden, so it is just useless baggage.
 *
 * This file is organized to make things a little easier to follow
 * while reading than they might otherwise: First the main static
 * declarations and utilities, then fields, then main public
 * methods (with a few factorings of multiple public methods into
 * internal ones), then sizing methods, trees, traversers, and
 * bulk operations.
 */

这一大段注释也非常重要。其大意为:这个哈希表设计的主要目的时维护一个并发可读(通常是get方法但也包括迭代器和相关的方法)同时将update操作的争用最小化,次要目标是提供与HashMap或者比HashMap更好的空间消耗。并支持多线程在空表上的初始插入。 这个Map通常表现为由一个bucket组成的hash表。每个key-value映射都保存在一个节点中。大多数节点都是具有hash、key、value和next字段的基本节点实例。然而,这个基本节点也存在各种子类,在平衡树的情况下会以TreeNodes出现。TreeBins是TreeNode的根。ForwardingNodes在resize的过程中被放在了bin的头部。ReservationNodes被当作占位符使用。TreeBin、ForwardingNode、ReservationNode 不包含正常的key、value、hash等值,并且在搜索过程中很容易区分。因为他们具有负数的hash字段和null的key和value。这些特殊节点是不常见的,只会暂时出现,所以携带一些未使用的字段对性能和空间的影响是微不足道的。 在第一次插入的时候,hash表被懒加载的初始化为2的幂的长度。表中的每个bin通常包含一个节点列表,通常,该列表只有零个或者一个节点。表访问需要volatile/atomic读写和CAS。因为没有其他方法可以在不添加进一步的间接寻址的情况下进行排列,所以我们使用内部的sun.misc.Unsafe操作。 我们使用节点的hash字段的符号位来进行控制,由于寻址约束,它仍然可用。具有负哈希字段的节点在map方法中被特殊处理或忽略。 将第一个节点插入(通过put或者其变体)到一个空的bin中。只需要将其放入bin即可。到目前为止,这是大多数key/hash在分布式下put操作最常见的情况。其他更新操作需要锁定。(插入、删除、replace)。我们不想浪费将不同锁对象与bin关联所需的空间,因此应该使用bin列表本身的第一个节点做为锁。对这些锁的锁定支持依赖于内部的"synchronized" monitors模型。 但是,使用列表的第一个节点做为锁本身是不够的,当一个节点被锁定时,任何更新都必须验证它是否仍然是锁定后的第一个节点,如果不是,则重试。因为新的节点总是被追加到列表中,一旦一个节点是bin中的第一个,它将保持在第一个位置。直到被删除或者扩容时bin失效。 在每个bin上加锁最主要的缺点就是受同一锁保护的bin列表中的其他节点上的其他更新操作可能会暂停。例如,当使用equals或者mapping函数花费很长时间的时候。然而,在统计上,在hash足够离散的情况下,这不是一个常见的问题。在理想情况下,bin中节点的概率服从泊松分布。参数的平均值为0.5,给定的阈值为0.75,由于resize粒度的原因,差异会比较大。忽略方差。列表中k预期出现次数为:

(exp(-0.5) * pow(0.5, k) / factorial(k))

上述公式k与概率的关系见下表:

k

概率

0

0.60653066

1

0.30326533

2

0.07581633

3

0.01263606

4

0.00157952

5

0.00015795

6

0.00001316

7

0.00000094

8

0.00000006

可以看到当k为8的时候,概率不足千万分之一。 在随机的情况下,访问两个不同元素的两个线程的锁竞争的概率大概为1/8。 实际情况中的hash分布有时会偏离均匀的随机性。这包括N>(1<<30)的情况。因此某些关键点必然会出现碰撞。类似的,对于一些愚蠢的或者恶意的用法,多个key被设计为具有相同的hash,或者只有在隐藏的高位上有不同。因此我们使用第二种策略,当bin中的节点数超过阈值的时候,该策略适用。这些TreeBins使用平衡的树来保存节点。(一种特殊形式就是红黑树)。将搜索时间限定为O(log n)。TreeBin中每个搜索步骤至少是常规列表中搜索步骤的两倍。但考虑到N不能超过1<<64。(在地址用完之前)。这就把搜索步骤,锁保持时间等限制在合理的常量时间中。(每个操作最坏的情况下大约需要检查100个节点),只要key是可比较的,(常见的字符串,long等)。TreeBin节点TreeNodes也维护一个与常规节点相同的next遍历指征。因此可以在相同的迭代器中遍历。 当占用率超过某个百分比的时候,将调整表大小阈值,名义为0.75。在任何一个线程启动之后,都可以在一个满线程分配的情况下,协助重新启动一个线程。然而,这些其他的线程不是暂停的。而是可以继续插入。TreeBins的使用可以使我们免受在调整大小的时候过度填充等最坏情况。 通过将bin一个接一个的转换到另外一个table进行扩容。然而,线程在传输之前会申明要传输的索引块很小。通过字段transferIndex。从而减少争用。字段sizeCtl产生的stamp会确保resize过程不会出现重叠。因为我们使用的是二次展开的幂。所以每个bin的元素要么保持在同一索引上,要么以2次方的offset进行移动。我们通过捕捉旧节点可重用的情况来消除不必要的节点创建。因为他们的下一个字段不会更改。平均情况下,当表翻倍时,只有大约1/6的节点需要copy。被替换的这些节点一旦不再被并发遍历表中的任何线程引用,就可以进行GC。在传输时,旧表bin只包含一个特殊的转发节点,hash字段为MOVED。而不是包含下一个表做为其KEY。遇到转发节点的时候,使用新表重新启动访问和更新操作。 每次bin转移的过程都需要对其进行lock。该bin的锁可能在resize的过程中停止等待。但是由于其他线程可以加入并帮助调整大小,而不是争抢锁。因此平均大小的等待时间随着调整大小的过程而变短。转移操作还必须确保任何遍历都可以使用旧表和新表中的所有可访问的bin。这是通过从最后一个bin(table.length-1)到第一个bin进行部分安排的。遍历请参见Traverser类。安排在不重新访问节点的情况下移动到新表。为了确保即使在无序移动时也不会跳过中间节点。在遍历过程中第一次遇到转发节点时会创建一个stack。请参见TableStack。以在以后处理当前表的时候保持其位置。这些保存恢复机制的需求相对较少。但是当遇到一个转发节点的时候,这种情况就会出现得比较多。因此,迭代器使用了一种简单的缓存方案来避免创建许多新的TableStack节点。(感谢Peter Levart建议在此处使用stack)。 遍历方案还适用于部分范围的bin,通过备用的Traverser构造函数,以支持分区聚合操作。另外,只读操作如果转发到空表也将放弃,该操作提供对shutdown-style clearubg的支持,目前该功能未实现。 懒加载的初始化过程可最大程度的减少首次使用之前对空间的占用。并且当第一个操作来自putAll,具有map参数的构造函数或者进行反序列化的时候,还避免了调整大小。这些情况试图超越初始容量设置,但是在竞争的过程中无法生效。 元素的count操作采用特殊化的LongAdder来处理,我们需要合并一个特殊化的对象,而不是仅仅使用LongAdder来访问隐式的感知。从而导致创建多个CounterCells。计数器机制避免了更新时的争用。但是如果在并发访问期间读取过于频繁,则可能会遇到高速缓存崩溃的情况。为了避免如此频繁的读取,仅在添加到容纳两个或者更多节点的容器之后,才尝试在竞争的情况下进行resize。在统一的hash分布下,此事件在阈值处发生的可能性为13%,这意味着有大约八分之一的位置放置了检查阈值,调整大小后,这样做的情况要减少很多。 TreeBins使用了一种特殊的比较形式来进行搜索和相关操作,这是我们不能使用现有集合如TreeMap的主要原因。TreeBins包含Comparable元素。但可能在包含的其他元素,以及对于同一泛型T存在可比较或者不一定可比较的元素。因此可能会造成我们无法使用compareTo,为了处理此问题,该Tree主要按hash值排序,然后按Comparable.compareTo排序。在节点上查找的时候,如果元素可比较或者比较结果为0,则在绑定hash的情况下,可能需要同时搜素左右节点。这对于如果所有元素都是非可毕的并且具有散列hash值的完整列表搜素。在插入的时候,为了保持rebalancings的总顺序,或者此处要求的最为接近的值。我们用identityHashCodes做为决胜局。红黑树的平衡代码从jdk的集合框架http://gee.cs.oswego.edu/dl/classes/collections/RBCell.java更新而来。依次基于Cormen,Leiserson和Rivest算法”(CLR)。 TreeBins还需要其他的锁定机制,尽管即使在update操作的过程中可以对table进行遍历,但是无法对树进行遍历,因为树的旋转可能会更改根节点或者其链接。TreeBins包含一个主要的bin同步策略上寄生的简单的独写锁机制。与插入或者删除相关结构调整已被bin锁定,因此不能与其他的写入发生冲突。但是必须等待正在进行的写入操作完成。因此我们使用了一种简单的方案,用一个waiter字段来阻止写入,但是读操作永远不会被阻塞。如果持有根上面的锁,则它将沿着慢速遍历路径,通过next指针前进,直到锁可用或者next为空为止。以先到者为准。这些情况不是很快,但是可以加大总的吞吐量。 与此类似的早期版本为了保持API和序列化的兼容性会带来一些奇怪的问题,主要是,我们保持不变,但是未使用的构造函数参数引用了concurrencyLevel。我们接收loadFactor参数,但是仅将其应用于初始表容量,这是我们唯一可以保证兑现它的时间,我们还声明一个未使用的Segment类,该类在序列化的时候以最小形式实例化。 同样,仅出于与此类之前版本兼容性的考虑,它继承了AbstractMap,即使所有方法都被覆盖,因此没有申明实际意义。 这个类文件的组值结构使得他们在阅读的时候比在其他情况下更容易理解,首先是静态声明和实用程序,然后是字段,然后是主要的公共方法,将多个公共方法分解为多个内部的方法,然后调整大小,方法,树,迭代器和批量操作。

1.2 常量

ConcurrentHashMap定义了一些非常重要的常量:

/**
 * The largest possible table capacity.  This value must be
 * exactly 1<<30 to stay within Java array allocation and indexing
 * bounds for power of two table sizes, and is further required
 * because the top two bits of 32bit hash fields are used for
 * control purposes.
 */
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * The default initial table capacity.  Must be a power of 2
 * (i.e., at least 1) and at most MAXIMUM_CAPACITY.
 */
private static final int DEFAULT_CAPACITY = 16;

/**
 * The largest possible (non-power of two) array size.
 * Needed by toArray and related methods.
 */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * The default concurrency level for this table. Unused but
 * defined for compatibility with previous versions of this class.
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * The load factor for this table. Overrides of this value in
 * constructors affect only the initial table capacity.  The
 * actual floating point value isn't normally used -- it is
 * simpler to use expressions such as {@code n - (n >>> 2)} for
 * the associated resizing threshold.
 */
private static final float LOAD_FACTOR = 0.75f;

/**
 * The bin count threshold for using a tree rather than list for a
 * bin.  Bins are converted to trees when adding an element to a
 * bin with at least this many nodes. The value must be greater
 * than 2, and should be at least 8 to mesh with assumptions in
 * tree removal about conversion back to plain bins upon
 * shrinkage.
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * The bin count threshold for untreeifying a (split) bin during a
 * resize operation. Should be less than TREEIFY_THRESHOLD, and at
 * most 6 to mesh with shrinkage detection under removal.
 */
static final int UNTREEIFY_THRESHOLD = 6;

/**
 * The smallest table capacity for which bins may be treeified.
 * (Otherwise the table is resized if too many nodes in a bin.)
 * The value should be at least 4 * TREEIFY_THRESHOLD to avoid
 * conflicts between resizing and treeification thresholds.
 */
static final int MIN_TREEIFY_CAPACITY = 64;

/**
 * Minimum number of rebinnings per transfer step. Ranges are
 * subdivided to allow multiple resizer threads.  This value
 * serves as a lower bound to avoid resizers encountering
 * excessive memory contention.  The value should be at least
 * DEFAULT_CAPACITY.
 */
private static final int MIN_TRANSFER_STRIDE = 16;

/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
 */
private static int RESIZE_STAMP_BITS = 16;

/**
 * The maximum number of threads that can help resize.
 * Must fit in 32 - RESIZE_STAMP_BITS bits.
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
 * The bit shift for recording size stamp in sizeCtl.
 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

/*
 * Encodings for Node hash fields. See above for explanation.
 */
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

整理后参见下表:

常量名

默认值

说明

MAXIMUM_CAPACITY

1<<30

允许的hahs table的最大容量,此值必须为1<<30,因为ConcurrentHashMap的长度必须为2的幂,由于32位的hash值的最高两位用于控制目的,因此这里是1<<30。

DEFAULT_CAPACITY

16

默认的初始容量,这个与HashMap一致,这个初始容量必须为2的幂,这个值是个权衡的结果,为16。在HashMap中表述为1<<4。

MAX_ARRAY_SIZE

Integer.MAX_VALUE - 8

可能的最大数组大小,toArray方法调用需要。

DEFAULT_CONCURRENCY_LEVEL

16

此表的默认并发级别,已经废弃,这是1.7版本中使用的

LOAD_FACTOR

0.75f

默认的负载因子,通常不会使用这个浮点值,会用位运算的表达式替换:n - (n >>> 2) 这也是个人认为为什么负载因子位0.75的原因之一,因为可以采用这个位运算表达式计算。

TREEIFY_THRESHOLD

8

链表树化的门槛,当添加一个元素时,bin将被转换位tree的时候至少有这么多节点。这个值必须大于2,且至少为8,以符合当删除元素的时候,从树转换为链表的收缩情况。另外,在前面大段的英文描述中已经说了,当链表长度为8的概率已经低于千万分之一,这是符合泊松分布的。

UNTREEIFY_THRESHOLD

6

红黑树转为链表的阈值,当remove操作的时候,导致原有的红黑树收缩。那么红黑树的长度为6的时候将触发UNTREEIFY操作。

MIN_TREEIFY_CAPACITY

64

需要注意的是,链表树化的条件是两个,一个是链表的长度大于8,另外一个就是table的容量大于等于64,否则链表不会树化。

MIN_TRANSFER_STRIDE

16

每个转换操作最小的重新绑定数量,范围被细分为允许多个调整程序的线程,此值做为下限,以避resize过程中遇到过多的内存争用。这个值至少应该与DEFAULT_CAPACITY相等。

RESIZE_STAMP_BITS

6

用于生成STAMP的位数,对于32位的数组,至少应该为6。需要注意的是,这个地方不是final修饰,但是奇怪的是也没有提供任何的修改方法。

MAX_RESIZERS

(1 << (32 - RESIZE_STAMP_BITS)) - 1=65535

可以协助进行resize的线程的最大数量,必须与RESIZE_STAMP_BITS匹配。这个值计算出来为65535。

RESIZE_STAMP_SHIFT

32 - RESIZE_STAMP_BITS

记录size stamp 在sizeCtl中的位移。

MOVED

-1

用户forward节点的hash值

TREEBIN

-2

红黑树根节点的hash值

RESERVED

-3

用于临时的转换节点的hash值

HASH_BITS

0x7fffffff

正常节点hash的可用位

NCPU

Runtime.getRuntime().availableProcessors()

系统可用CPU数量,以限制某些资源

serialPersistentFields

用于序列化的属性

1.3 成员变量

ConcurrentHashMap的主要成员变量如下:

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */
transient volatile Node<K,V>[] table;

/**
 * The next table to use; non-null only while resizing.
 */
private transient volatile Node<K,V>[] nextTable;

/**
 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */
private transient volatile long baseCount;

/**
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
private transient volatile int sizeCtl;

/**
 * The next table index (plus one) to split while resizing.
 */
private transient volatile int transferIndex;

/**
 * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
 */
private transient volatile int cellsBusy;

/**
 * Table of counter cells. When non-null, size is a power of 2.
 */
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

这些变量整理为如下表:

变量

类型

说明

table

transient volatile Node<K,V>[]

存储hash表的数组。需要注意的是这个地方是transient volatile,transient是因为序列化,这个与HashMap类似,而volatile则是为了实现并发的可见性。

nextTable

transient volatile Node<K,V>[]

这个与table同理,是在resize进行扩容的时候才用到。之后正常情况下还是为空

baseCount

transient volatile long

计数器,在无锁竞争的情况下使用,存在竞争的时候用作备用处理,采用cas更新。因此ConcurrentHashMap的size方法可能不是一个准确的值

sizeCtl

transient volatile int

用于表初始化调整的控件,如果为负数,则表示正在进行初始化或者调整大小,-1表示初始化状态,-(1+调整大小线程数)表示当前正在resize。当table为null的时候,保留创建时要用的初始化大小,0表示默认值。初始化之后,保存下一个要调整大小元素的计数值。

transferIndex

transient volatile int

调整大小的过程中需要进行拆分的下一个bucket的index

cellsBusy

transient volatile int

cas需要的一个控制单元

counterCells

transient volatile CounterCell[]

计数器数组,size为2的幂

keySet

transient KeySetView<K,V>

视图类keySet

values

transient ValuesView<K,V>

视图类valuesSet

entrySet

transient EntrySetView<K,V>

视图类EntrySet

实际上在1.8版本的ConcurrentHashMap中,已经没有使用分段锁,直接通过cas+synchronized在每个bucket上操作。这一点将在后面基本原理章节详细介绍。

1.4 unsafe静态代码块

在1.8版本的ConcurrentHashMap中,大量的采用了CAS机制来实现无锁化,变量sun.misc.Unsafe U就是所有cas机制使用的对象。在使用这些对象的时候,采用Unsafe.getObjectVolatile来获取,我们要是熟悉jvm线程模型的话,应该知道,每个线程在工作的过程中,会从主内存中将table copy一份到其工作内存中。虽然table为volatile修饰,但是并不能保证数据是最新的,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。 有关UnSafe的内容,本文中将不展开介绍,在后续通过专门的博文进行介绍。

// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

2.基本原理

在聊关于ConcurrentHashMap的实现原理的时候,我们需要对1.7版本的ConcurrentHashMap进行回顾。以便理解新版本的ConcurrentHashMap究竟在哪些方面做了优化。

2.1 1.7版本介绍

2.1.1 1.7版本的基本组成

在1.7版本中,ConcurrentHashMap就是一个在面试中经常被问到的地方,大家基本上都知道,ConcurrentHashMap的1.7版本采用分段锁实现。实际上分段锁如何实现,网上有很多图,个人觉得都没有描述得特别清楚。因此重新画图如下:

在ConcurrentHashMap中,其基本的构成就是Segament数组,每个Segament类,继承了ReentrantLock锁,这就是分段锁。Segament内部由HashEntry数组和count构成。如上图。熟悉HashMap源码的人,一眼就能看出,实际上1.7版本的ConcurrentHashMap的Segment就是一个小型的HashMap。这个Segment数组,再创建之后,就不会改变其大小。只能由DEFAULT_CONCURRENCY_LEVEL参数指定。

Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

初始化的Segment长度为ssize,这个ssize的计算过程:

 int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }

也就是系统会选取一个比传入的concurrencyLevel大或者等的2的幂的数。 再创建之后,Segment数据就不会改变。再默认情况下,这个数字是16,那么理论上而言,可以将原有的HashTable的性能提高16倍。 数据再put和get的时候,需要经过两次hash寻址,第一次是找到segment:

long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

通过这个u就能定位到Segment。之后再在sement定位到bucket。

((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE

实际代码是这样的:

for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
             (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
         e != null; e = e.next) {
        K k;
        if ((k = e.key) == key || (e.hash == h && key.equals(k)))
            return e.value;
    }

在for循环中第二次&计算得到了HshEntry。之后再遍历链表。这样得到具体的HashEntry。

2.1.1 1.7版本的弊端

我们来回顾一下旧版本的ConccurrentHashMap,默认情况下,在理论上可以将其性能提升到HashTable的16倍。但是这仅仅只是个理论情况。我们设想一下,Hash一旦冲突,势必会造成锁竞争。那么性能就会下降。而通常造成hash所谓的冲突有两种情况。一是真正的hash冲突,数据离散程度不够。另外一种情况就是,由于时采用&进行的取模运算。很容易就会出现定位到bucket相同的情况。 那么对于第一种情况,没有什么好的优化办法。原则上还是从根本上去解决问题,hash更加离散。如果用户的所有数据计算出来hashcode都一样,那么这个没有更好的办法了。对于第二种情况,我们可以想象到,HashMap的扩容的过程,如果触发了resize,那么显然,分到一个槽位的概率就会降低。那么采用分段锁的ConcurrentHashMap恰恰是这个分段锁不会随着扩容而增加。只能事先分配,如果事先分配得比较大,就会造成内存的浪费。 那么说到这里,你是不是也能灵机一动,要是咱们能把这个锁做成动态的,随着扩容而变化,将HashMap扩容的过程结合起来,是不是更加完美。对,java并发大神Doug Lea也是这么干的。实际上在1.8中的concurrentHashMap,采用了CAS+synchronized,只有在bucket不为空的时候才锁定,锁定的就是bucket上的哪个根节点。这样一来,就避免了旧版本随着数据量的上升,性能下降的问题。

2.2 1.8版本的基本原理

新版本的实现见下图:

我们可以看一下putVal方法中的实现,在新版本的大多数方法中都采用死循环加break的方式:

for (Node<K,V>[] tab = table;;) 

之后,先判断table是否为null或者size为0,则调用初始化方法。如果不为null,再判断bucket是否为空,为空则采用cas的方式casTabAtnew一个新的Node。如果不为空,则先判断其hash是否为MOVED状态。如果是,则调用helpTransfer。反之,在不为空且hash也正常的情况下,则用synchronized()锁定这个节点。即锁定了bucket中的根节点。

 synchronized (f) 

实际上这样就很容易明白新版本ConcurrentHashMap的原理了。只有在bucket不为空的情况下,才会用同步锁。而且随着table的resize过程,这样势必就会进一步分散锁的争用情况。这样会随着table的扩容而提升同步的性能。因为有一部分hash冲突的原因并不是hash相同。 我们将在后面的重点方法中去详细分析插入、resize等过程。 另外需要注意的是,新版本对于红黑树的处理,由于考虑到红黑树的特性,root节点会随着树的转置而变化,因此在concurrentHashMap中引入了TreeBin节点做为树化之后的root节点。synchronized就会加在这个root节点上。这样避免红黑树因为转置对root节点的影响。

3.重要的内部类

3.1 Node、TreeNode

我们来看看ConcurrentHashMap的新的内部结构:

与之前在介绍HashMap的时候类似,重点的类还是在Node和TreeNode,TreeNode的基本的红黑树的算法与HashMap类似,这就不展开说明。唯一不同的是,由于不用考虑数据的有序性问题,因此没有在继承LinkedHashMap.Entry。目前这个结构就如上图所示,还是在达到阈值之后转换为红黑树。

3.2 TreeBin

TreeBin这个类,在前面说过,实际上是为红黑树提供的root节点。之后在转为红黑树的时候,这个类才会用到,继承结构如下:

/**
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock

}

在TreeBin在其节点内部实现了一个简单的读写锁机制,并定义了三个状态,WRITER、WAITER、READER。实现对读写的控制。另外TreeBin还将之前HashMap之中的关于红黑树转置的所有方法都搬到了TreeBin中。这样TreeNode节点就非常简单了,内部只有find方法。关于TreeBin的细节我们可以在后续另行讨论。

3.3 ForwardingNode

这是ConcurrentHashMap在扩容的过程中出现的节点,其内部代码如下:

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        //标记 死循环
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            //为null则直接return
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            //死循环
            for (;;) {
                int eh; K ek;
                //如果
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                    //如果在遍历的过程中又出现了新的resize导致当前节点是非正常节点。
                if (eh < 0) {
                //如果出现再次移动则跳转到最开始的循环
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    //否则直接下一步
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
}

ForwardingNode是一个由于扩容而临时存在的节点,其内部默认的hash为MOVED。也就是说只有hash为-1的节点就是ForwardingNode节点。这将在扩容的过程中用到。

3.4 ReservationNode

这是computeIfAbsent和compute方法中需要用到的占位符节点。实际上除了hash为负数之外本身没有什么内容。

/**
 * A place-holder node used in computeIfAbsent and compute
 */
static final class ReservationNode<K,V> extends Node<K,V> {
    ReservationNode() {
        super(RESERVED, null, null, null);
    }

    Node<K,V> find(int h, Object k) {
        return null;
    }
}

RESERVED值为-3。这个节点只会在上述两个方法的时候才会存在。

3.5 Traverser、TableStack

Traverser是一个只读的迭代器,这是ConcurrentHashMap1.8版本新引入的机制。这与ConcurrentHashMap的扩容机制有关。由于旧版本的ConcurrentHashMap采用Segment锁,那么其扩容的过程中势必会锁定整个Segment。但是在新版本中,在扩容期间,采用的是多线程扩容,table的结构会产生改变,由于没有全局的锁,那么Map此时进行遍历就不具有一致性。 为了解决这个问题,1.8中采用了全新的设计,在扩容的时候,当处理完一个bucket之后,会放入一个ForwardingNode,那么Traverser就是被专门设计用来处理这种情况的类。TableStack是配合这个机制使用的。 这个解决方式也很简单,在遍历过程中,如果遇到ForwardingNode节点,那么保存当前正在遍历的数组已经索引信息到TableStack中,之后跳转到ForwardingNode指向的新的table中进行遍历,遍历完成之后再返回TableStack中取回上一次保存的结果接着遍历。 我们来看看源码:

 /**
     * Records the table, its length, and current traversal index for a
     * traverser that must process a region of a forwarded table before
     * proceeding with current table.
     */
    static final class TableStack<K,V> {
        int length;
        int index;
        Node<K,V>[] tab;
        TableStack<K,V> next;
    }

    /**
     * Encapsulates traversal for methods such as containsValue; also
     * serves as a base class for other iterators and spliterators.
     *
     * Method advance visits once each still-valid node that was
     * reachable upon iterator construction. It might miss some that
     * were added to a bin after the bin was visited, which is OK wrt
     * consistency guarantees. Maintaining this property in the face
     * of possible ongoing resizes requires a fair amount of
     * bookkeeping state that is difficult to optimize away amidst
     * volatile accesses.  Even so, traversal maintains reasonable
     * throughput.
     *
     * Normally, iteration proceeds bin-by-bin traversing lists.
     * However, if the table has been resized, then all future steps
     * must traverse both the bin at the current index as well as at
     * (index + baseSize); and so on for further resizings. To
     * paranoically cope with potential sharing by users of iterators
     * across threads, iteration terminates if a bounds checks fails
     * for a table read.
     */
    static class Traverser<K,V> {
        //当前的hashtable
        Node<K,V>[] tab;        // current table; updated if resized
        Node<K,V> next;         // the next entry to use
        TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
        int index;              // index of bin to use next
        int baseIndex;          // current index of initial table
        int baseLimit;          // index bound for initial table
        final int baseSize;     // initial table size

        Traverser(Node<K,V>[] tab, int size, int index, int limit) {
            this.tab = tab;
            this.baseSize = size;
            this.baseIndex = this.index = index;
            this.baseLimit = limit;
            this.next = null;
        }

        /**
         * Advances if possible, returning next valid node, or null if none.
         */
         //遍历前进
        final Node<K,V> advance() {
            Node<K,V> e;
            //如果不为空则前进
            if ((e = next) != null)
                e = e.next;
            //死循环
            for (;;) {
                Node<K,V>[] t; int i, n;  // must use locals in checks
                //如果不为空则next指向它
                if (e != null)
                    return next = e;
                if (baseIndex >= baseLimit || (t = tab) == null ||
                    (n = t.length) <= (i = index) || i < 0)
                    return next = null;
                if ((e = tabAt(t, i)) != null && e.hash < 0) {
                  //如果遇到FN节点
                    if (e instanceof ForwardingNode) {
                    //tab为新的table 并将之前的table和索引入栈
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        e = null;
                        pushState(t, i, n);
                        continue;
                    }
                    //遇到红黑树节点则当前节点无意义
                    else if (e instanceof TreeBin)
                        e = ((TreeBin<K,V>)e).first;
                    else
                        e = null;
                }
                //都遍历完之后再从stack中取出之前的数据
                if (stack != null)
                //如果不为空则恢复之前的遍历
                    recoverState(n);
                //反之则递增索引
                else if ((index = i + baseSize) >= n)
                    index = ++baseIndex; // visit upper slots if present
            }
        }

        /**
         * Saves traversal state upon encountering a forwarding node.
         */
         //入栈
        private void pushState(Node<K,V>[] t, int i, int n) {
            TableStack<K,V> s = spare;  // reuse if possible
            if (s != null)
                spare = s.next;
            else
                s = new TableStack<K,V>();
            s.tab = t;
            s.length = n;
            s.index = i;
            s.next = stack;
            stack = s;
        }

        /**
         * Possibly pops traversal state.
         *
         * @param n length of current table
         */
         //从栈中恢复数据
        private void recoverState(int n) {
            TableStack<K,V> s; int len;
            while ((s = stack) != null && (index += (len = s.length)) >= n) {
                n = len;
                index = s.index;
                tab = s.tab;
                s.tab = null;
                TableStack<K,V> next = s.next;
                s.next = spare; // save for reuse
                stack = next;
                spare = s;
            }
            if (s == null && (index += baseSize) >= n)
                index = ++baseIndex;
        }
    }

上面这个过程可以描述为如下图:

table遍历过程中,遇到ForwardingNode,则会先去遍历ForwardingNode的nextTable指向的扩容的table,之后再回来接着遍历之前没有遍历完成的table部分。

3.6 Iterator与Spliterator

既然Traverser的作用是为了实现遍历,那么在concurrentHashMap中,迭代器和并行迭代器都是这个类的子类。

这是EntryIterator的继承结构图,那么其他的KeyIterator、ValueIterator也都一样,继承了BaseIterator,BaseIterator再继承Traverser。 ValueSpliterator、EntrySpliterator、KeySpliterator则是直接继承了Traverser。

3.7 Segment

需要注意的是,在Java8中Segment类依然存在,但是仅仅只是在序列化和反序列化方法中使用。用作对老版本的兼容性需求。已经不具备实际的意义。

/**
 * Stripped-down version of helper class used in previous version,
 * declared for the sake of serialization compatibility
 */
static class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    final float loadFactor;
    Segment(float lf) { this.loadFactor = lf; }
}

4.重要方法

我们来看看ConcurrenthashMap中的一些重要的方法:

4.1 put

首先kankput方法:

public V put(K key, V value) {
    return putVal(key, value, false);
}

实际上底层请求的是putVal:

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
   //不支持key和value为空的情况
    if (key == null || value == null) throw new NullPointerException();
    //通过spread对hashcode统一进行处理
    int hash = spread(key.hashCode());
    int binCount = 0;
    
    //死循环
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果table为空则进行初始化table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //反之则判断槽位是否为空
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        //如果槽位为空则cas的方式插入即可,之后退出
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //反之,如果不为空,如果在Moved状态,则通过Transfer进行
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //反之则同步锁定插入
        else {
            V oldVal = null;
            //同步锁,锁定接口
            synchronized (f) {
               //再次判断
                if (tabAt(tab, i) == f) {
                    //如果为正常节点
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历到最后插入
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    如果是红黑树的root节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //红黑树插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //判断是否触发树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //在binCount计数器中增加次数
    addCount(1L, binCount);
    return null;
}

4.2 spread

spread是ConcurrentHashMap中对所有hash都需要再次处理的方法。

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

HASH_BITS为0x7fffffff,我们在之前学过HashMap的位移,实际上可以看除,这里是将高位和低位混淆,然后保留0x7fffffff的位数。计算过程可以参见下图:

这样可以总体保证最高为为0。另外将高位部分进行混淆。

4.3 initTable

这是对Hashtable进行初始化的方法。ConcurrentHashMap与HashMap一样,在最开始,并不会初始化。采用懒加载的方式,只有在使用的时候,ConcurrentHashMap才会通过initTable进行初始化。

/**
 * Initializes table, using the size recorded in sizeCtl.
 */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //这个地方是while循环,因为yield的缘故
    while ((tab = table) == null || tab.length == 0) {
       //如果sizeCtl小于0则让出线程,之后如果继续执行则重新进入循环。
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //否则采用cas的方式将sizeCtl取出并于之前的比较,如果相等则设置为-1,表示当前有线程正在初始化。避免并发情况
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
               //再次判断,类似于doublecheck
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //创建数组 如果sizeCtl>0则设置为sizeCtl否则为默认16
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    将sc改为其长度的3/4。
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

上述方法完成了对table的初始化。

4.4 helpTransfer

如果正在resize的过程中,那么通过helpTransfer进行遍历,实际上利用的前文的Transfer类中间的方法。前面已经做过详细的描述。我们来看看具体代码:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //再次确认是否应该调用helpTransfer方法,f必须是FN节点,且nextTable不为空
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //重新生成Stamp
        int rs = resizeStamp(tab.length);
        //如果不为正常节点则说明扩容还在继续
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
               // 如果 sizeCtl 无符号右移  16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
            // 或者 sizeCtl == rs + 1  (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
            // 或者 sizeCtl == rs + 65535  (如果达到最大帮助线程的数量,即 65535)
            // 或者转移下标正在调整 (扩容结束)
            // 结束循环,返回 table
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            // 进行转移
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

注意这个地方调用resizeStamp方法。这个方法代码如下:

static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

numberOfLeadingZeros返回n的二进制表示的前面为0的位数。此方法用于生成Stamp。

4.5 transfer

这个方法才是将当前槽位的节点移动到newTable中的具体方法。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
   //n为表的长度,stride为每个cpu需要处理的bucket数量
    int n = tab.length, stride;
    //如果bucket数量不是很多那么用一个线程即可,实际上stride如果小于16 就只会启动一个线程
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //对nextTab进行初始化
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            //扩容 左移一位
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
        //如果这个过程出现异常那么将sizeCtl变成最大
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    //定义一个新的变量表示newTable的长度
    int nextn = nextTab.length;
    //占位节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    //遍历前进标识
    boolean advance = true;
    //是否完成标识,确保在commit nextTab之前清除
    boolean finishing = false; // to ensure sweep before committing nextTab
    //死循环 外层用于遍历bucket 内层死循环用于遍历Node
    for (int i = 0, bound = 0;;) {
    //每个线程在此进行处理
        Node<K,V> f; int fh;
        //根据前进标识再次死循环
        while (advance) {
            int nextIndex, nextBound;
            //防止每个bucket没有处理完成就前进
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;//每个线程处理的区间的下限
                i = nextIndex - 1;//每个线程处理的区间的上限
                advance = false;//前进标识为false
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {//判断是否完成扩容
                nextTable = null;//更新table,将table和newTab互换
                table = nextTab;
                //更新阈值
                sizeCtl = (n << 1) - (n >>> 1);
                return;//退出
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;//扩容结束
                finishing = advance = true;
                //在次循环check
                i = n; // recheck before commit
            }
        }
        //获得之前table中的i的bucket
        else if ((f = tabAt(tab, i)) == null)
        //如果写入fwd 则前进
            advance = casTabAt(tab, i, null, fwd);
        //如果为MOVE则说明其他线程已经在处理这个Bucket,则跳过
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
        //锁住首位节点,开始处理
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                    //首节点的hash
                        int runBit = fh & n;
                        //最后一个节点
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        //根据计算的高低位进行判断
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        //高位复用
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        //高低位处理,分别放到对应的高低位位置
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //红黑树处理
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //红黑树如果小于6则转为链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

上述就是扩容的完整方法。可以看到除了多线程和使用占位符节点ForwardingNode之外,大部分的原理都与HashMap差不多。ConcurrentHashMap采用的是new了一个新的table,然后逐步将旧的table上的bucket全部转到这个新的table,转移完成之后,在将这个新的table启用,改变变量指针。另外对于高低位的处理方法,与HashMap也差不多。 这个扩容方法也是ConcurrentHashMap的精华部分所在,再扩容过程中,实际上是支持多线程操作的。可能没看明白的人比较奇怪,明明没有new一个新的thread来执行,怎么就成了并发呢?实际上原理是这样的,在一开始,会计算线程数之后将bucket分批,然后当前线程会处理自己的这个区间的bucket,那么此时如果有另外一个线程也触发了扩容,由于newTable是全局的,那么也会分得另外一个没处理的区间,之后再有第三个线程,就会依次处理,因为区间的分配是通过共享标识来确定的。如果没有多线程,那么当前线程处理完之后,再进行第二个批次的处理,依次直到全部都处理完毕,这也是ConcurrentHashMap中比较难理解的地方之一。 再明白了这一点之后,不得部佩服作者对于并发的驾驭能力。

4.6 remove

remove操作底层是replaceNode方法:

public V remove(Object key) {
    return replaceNode(key, null, null);
}

代码如下:

final V replaceNode(Object key, V value, Object cv) {
    //hash扩展函数 高位参与运算
    int hash = spread(key.hashCode());
    //死循环
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
           //定位bucket
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果再MOVED状态则通过helpTransfer方法
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            //锁定root节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        //死循环 遍历节点
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //如果是红黑树则另行处理
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            //校验 同时addCount操作
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

再理解了扩容之后,实际上这个过程已经很简单了。

4.7 size

还需要介绍一下的是ConcurrentHashMap中的size机制。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

实际上可以看除从sumCount方法中计算:

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

可以看出,实际上sum方法是遍历CounterCell数组,再并发情况下,ConcurrentHashMap采用的计数器来操作。实际上,可能这个size得到的不是一个精确的值。关于为什么不能得到精确值的原因,后面再另行分析。 其他方法再remove或者add的之后就会触发这个计数器方法。对其进行增减。

5.总结

如上即是对ConcurrentHashMap源码的分析,由于ConcurrentHashMap源码实在是太多,涵盖的知识面也非常广。限于篇幅和能力。就不再做过多的细节介绍了。但是我们需要记住如下几点:

  • 1.ConcurrentHashMap在jdk1.8之后相比jdk1.7有本质的变化,1.7中使用了分段锁,类似与每个段都是一个小型的hashMap,而且段的数量不能动态增加,只能在初始化的时候进行设置。这样就会很不灵活。那么1.8版本则采用了更加灵活的方式,没有使用分段锁。直接synchronized的是root节点,这也间接的说明了synchronized在1.8中的性能与可重入锁实际上没有太多的区别。
  • 2.新版的ConcurrentHashMap采用了Cas+synchronized的方式,通常情况下,造成hash碰撞的情况只有两种,要么槽位数量太少不同的hashcode都分散到统一的bucket,要么hashcode完全一致。新版中完美解决了第一个问题,可以随着扩容而降低锁的粒度。增加性能。
  • 3.ConcurrentHashMap的扩容的过程是支持多线程的,采用标志位来进行,每个线程一次只会分配一定数量的bucket,此时如果有其他的线程触发了扩容就会参与进来。如果没有其他的线程,那么当前线程在处理完当前这个批次之后会接着处理以后的部分。

如上是个人的一些看法,如有不足,请补充。