java中的reference(四): WeakReference的应用--ThreadLocal源码分析

时间:2022-07-22
本文章向大家介绍java中的reference(四): WeakReference的应用--ThreadLocal源码分析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

实际上,在分析整个Reference包源码之前,重点关注的问题就是ThreadLocal的源码。这也是学习Reference这个系列的初衷。一开始的想法就是将ThreadLocal源码好好理解一遍。毕竟这这也是目前大多数大厂面试的高频考点。但是在打开ThreadLocal之后,发现最关键的是巧妙应用了WeakReference。虽然ThreadLocal的其他代码的巧妙程度也让人印象深刻。但是ThreadLocal绝对称得上WeakReference的经典应用,没有之一。面试必问。要想搞明白ThreadLocal必须弄清楚WeakReference。这也是这个Reference的动机之一。学习就是如此,从一个点逐渐衍生到一个面。那么看了weakReference,就会自然的看Reference的各个子类。包括在上一篇,对FinalReference的分析,这都是之前没有重点关注的冷门知识点。那么现在能放到一个整体去分析,也是一个值得高兴的事情。

1.ThreadLocal的使用

1.1 threadlocal 运行示例

看如下示例代码,我们有两个线程,a和b,线程a启动之后,sleep 2秒,从threadlocal t1中取获取person实例 p,线程b,启动之后,sleep 1秒,然后set Person的实例p到threadlocal t1中去。

	volatile static Person p = new  Person();

	static ThreadLocal<Person> t1 = new ThreadLocal<>();


	public static void main(String[] args) {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(2);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(" thread a "+t1.get());
		}).start();

		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			t1.set(new Person());
			System.out.println(" thread b "+t1.get());
		}).start();
	}

	static class Person {
		String name;
	}

运行代码结果如下:

 thread b com.dhb.test.ThreadLocal1$Person@5058a2e0
 thread a null

Process finished with exit code 0

可以看到,thread b能获取到p,而thread a不能。这就证明了threadlocal的主要功能。threadlocal提供了一个对线程隔离的局部变量载体。

1.2 threadlocal的主要功能

可以看一下threadlocal中源码的注释:

/**
 * This class provides thread-local variables.  These variables differ from
 * their normal counterparts in that each thread that accesses one (via its
 * {@code get} or {@code set} method) has its own, independently initialized
 * copy of the variable.  {@code ThreadLocal} instances are typically private
 * static fields in classes that wish to associate state with a thread (e.g.,
 * a user ID or Transaction ID).
 *
 * <p>For example, the class below generates unique identifiers local to each
 * thread.
 * A thread's id is assigned the first time it invokes {@code ThreadId.get()}
 * and remains unchanged on subsequent calls.
 * <pre>
 * import java.util.concurrent.atomic.AtomicInteger;
 *
 * public class ThreadId {
 *     // Atomic integer containing the next thread ID to be assigned
 *     private static final AtomicInteger nextId = new AtomicInteger(0);
 *
 *     // Thread local variable containing each thread's ID
 *     private static final ThreadLocal&lt;Integer&gt; threadId =
 *         new ThreadLocal&lt;Integer&gt;() {
 *             &#64;Override protected Integer initialValue() {
 *                 return nextId.getAndIncrement();
 *         }
 *     };
 *
 *     // Returns the current thread's unique ID, assigning it if necessary
 *     public static int get() {
 *         return threadId.get();
 *     }
 * }
 * </pre>
 * <p>Each thread holds an implicit reference to its copy of a thread-local
 * variable as long as the thread is alive and the {@code ThreadLocal}
 * instance is accessible; after a thread goes away, all of its copies of
 * thread-local instances are subject to garbage collection (unless other
 * references to these copies exist).
 *
 * @author  Josh Bloch and Doug Lea
 * @since   1.2
 */

大意为,在jdk1.2版本之后,jdk提供了一个基于线程隔离的线程本地变量。每个访问的get和set方法的线程都有自己独立的变量副本。threadlocal的实例通常会设置为private static 类型,以便将一些状态和某个线程关联。(如用户编号和事务ID)。 然后提供了一个基于AtomicInteger 的demo。 对于一个threadlocal对象,每个线程在存活的周期内都保留了一个对该对象的隐式引用,这个ThreadLocal可以进行数据存取。当线程死亡的时候,线程中的所有threadLocal对象都会被GC回收(除非有其他对ThreadLocal的引用任然存在)。 这就是threadlocal的主要功能。这个功能主要用在什么地方呢?实际上,可能我们每天都在用,但是你并没有关注到而已。在spring中,基于数据库事务的的调用,spring使用连接池连接数据库,又需要在CRUD操作中把多个代码中的操作放到一个事务中的话,那么最好的办法就是,让连接与spring的线程绑定,这个线程的所有crud操作最终都在一个connection上commit。这自然可以实现这些需求,这也是spring面试的高频考点。

1.3 threadlocal提供的主要api

threadLocal的public方法表如下:

可以看到,除了构造函数之外,ThreadLocal的主要方法有,get、set、remove和基于lambda的withInitial方法。

1.3.1 get

    /**
     * Returns the value in the current thread's copy of this
     * thread-local variable.  If the variable has no value for the
     * current thread, it is first initialized to the value returned
     * by an invocation of the {@link #initialValue} method.
     *
     * @return the current thread's value of this thread-local
     */
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    
        /**
     * Get the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param  t the current thread
     * @return the map
     */
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

可以看到,ThreadLocal内部维护了一个特殊的HashMap,这个Map存在当前线程(Thread.currentThread())的threadLocals参数中,以当前的ThreadLocal为key。通过当前threadLocal去Map中获取Entry。这个特殊的Map就是ThreadLocalMap。通过getmap方法可以知道,这个Map实际上就维护在Thread对象中。属性为threadLocals。

1.3.2 set

    /**
     * Sets the current thread's copy of this thread-local variable
     * to the specified value.  Most subclasses will have no need to
     * override this method, relying solely on the {@link #initialValue}
     * method to set the values of thread-locals.
     *
     * @param value the value to be stored in the current thread's copy of
     *        this thread-local.
     */
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    
    /**
     * Create the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the map
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);

通过set方法的源码,我们可以看到,在set的时候,首先判断map是否为null,如果为null则调用creatMap方法,以当前传入的value创建一个以当前ThreadLocal为key的新的map。这个把当前线程的threadLocals 指向这个map。 而InheritableThreadLocal,则会对createMap重写,以实现可继承的在子类中共享的ThreadLocal。 因此可以知道,每个线程都有一个固定的threadLocals属性,这个属性指向一个ThreadLocalMap。

1.3.3 remove

  /**
     * Removes the current thread's value for this thread-local
     * variable.  If this thread-local variable is subsequently
     * {@linkplain #get read} by the current thread, its value will be
     * reinitialized by invoking its {@link #initialValue} method,
     * unless its value is {@linkplain #set set} by the current thread
     * in the interim.  This may result in multiple invocations of the
     * {@code initialValue} method in the current thread.
     *
     * @since 1.5
     */
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }

remove方法主要是从当前线程的ThreadLocalMap中将ThreadLocal为key的Entry移除。对于Threadlocal,如果使用完毕,则务必调用remove方法移除,以避免引起内存泄漏或者OOM。后面会对这个问题做详细分析。

1.3.4 withInitial

    /**
     * Creates a thread local variable. The initial value of the variable is
     * determined by invoking the {@code get} method on the {@code Supplier}.
     *
     * @param <S> the type of the thread local's value
     * @param supplier the supplier to be used to determine the initial value
     * @return a new thread local variable
     * @throws NullPointerException if the specified supplier is null
     * @since 1.8
     */
    public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
        return new SuppliedThreadLocal<>(supplier);
    }

这个withInitial方法是jdk1.8之后专门给lambda方式使用的的构造方法。这个方法采用Lambda方式传入实现了 Supplier 函数接口的参数。如下:

ThreadLocal<Integer> balance = ThreadLocal.withInitial(() -> 1000);

这样即可用lambda的方式进行调用。

2.ThreadLocal核心源码及其与Weakreference的关系

2.1 ThreadLocalMap结构

ThreadLocal的核心部分就是ThreadLocalMap。

    /**
     * ThreadLocalMap is a customized hash map suitable only for
     * maintaining thread local values. No operations are exported
     * outside of the ThreadLocal class. The class is package private to
     * allow declaration of fields in class Thread.  To help deal with
     * very large and long-lived usages, the hash table entries use
     * WeakReferences for keys. However, since reference queues are not
     * used, stale entries are guaranteed to be removed only when
     * the table starts running out of space.
     */
    static class ThreadLocalMap {
    
            /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        
    ...
    
    }

可以看到,注释中说得非常明白,ThreadLocalMap是一个特定的hashMap,只适用于ThreadLocal,private修饰,做为threadLocal的内部类,无法在其他地方访问到。这个ThreadLocalMap的Entry继承了WeakReference,用以实现对value对象的长期缓存。但是,由于用户不能直接操作ReferenceQueue,而WeakReference与Key的绑定,key是ThreadLocal自身,那么Entry到Key之间就是弱引用的关系,因此,只有GC的时候这些过期不用的entry才会被删除。当entry.get()方法为null的时候,表示这个entry是过时的。

2.2 ThreadLocalMap与WeakReference的关系

从上文中可以看到,ThreadLocalMap的Entry是WeakReference的,那么,当对这个Entry中的强引用消失之后,weakReference就会被GC回收。

ThreadLocal a = new ThreadLocal();
a.set(new byte[1024*1024*10]);

以上述代码为例,其内存布局如下:

如上图所示,如果定义了一个ThreadLocal,那么在Stack上就会有两个指针,分别指向ThreadLocal和当前线程在堆上的内存地址。之后,当前的线程中的threadLocals指向这个ThreadLocalMap,而Map中的Entry,包括Key和Value,Key又通过WeakReference的方式指向了ThreadLocal。Value即是当前需要放在ThreadLocal中的值。可能是一个大的对象,以供线程内部共享。因此value强引用指向了这个value内容。 此时不难发现一个问题,就是当ThreadLocal的强引用一旦消失之后,如申明一个threadLocal变量a,此时令a=null,那么之前的threadlocal就会被GC回收。

ThreadLocal a = new ThreadLocal();
a.set(new byte[1024*1024*10]);
a = null;

此时,如果a=null,那么后面如果执行GC,会导致a被回收,而ThreadLocalMap中,这个a对应的Entry的key就会变成null,而value为10MB,并不会在这次GC中回收。这也是threadLocal可能会造成内存泄漏的原因。因此,如果有threadlocal不需要使用之后,最好的办法是使用remove将其从ThreadLocalMap中移除。

2.3 ThreadLocalMap的核心源码

我们再来详细看看ThreadLocalMap,这个关键的类,使用了很多脑洞大开的设计,值得我们在以后的编码中进行借鉴。

2.3.1 基本组成元素 Entry

    /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

Entry是ThreadLocalMap的核心,也是应用WeakReference的地方。Entry本身继承了WeakReference。之后将传入的ThreadLocal也就是key,放在了WeakReference中,这样构成了对key的WeakReference,而value则是Entry的属性,对value的指针是强引用。 其结构如下图:

引用关系如下:

2.3.2 构造函数

ThreadLocal有两个主要的构造函数,分别是创建的时候插入一个Entry和批量插入Entry构造。

2.3.2.1 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue)

这个构造函数在使用的时候需要传入第一个key和value。ThreadLoccalMap底层的hash表的长度初始为INITIAL_CAPACITY = 16。 这个构造函数的作用域在protected。

     /**
         * The initial capacity -- MUST be a power of two.
         */
        private static final int INITIAL_CAPACITY = 16;
 /**
         * Construct a new map initially containing (firstKey, firstValue).
         * ThreadLocalMaps are constructed lazily, so we only create
         * one when we have at least one entry to put in it.
         */
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            //初始hash表,长度为16
            table = new Entry[INITIAL_CAPACITY];
            //Hash取模运算,计算index
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            //根据hash取模得到索引位置,然后构建Entry
            table[i] = new Entry(firstKey, firstValue);
            //维护长度变量,初始为1
            size = 1;
            //设置负载因子
            setThreshold(INITIAL_CAPACITY);
        }

该方法主要配合ThreadLocal中的createMap方法使用。ThreadLocal是采用懒加载的方式,在需要的时候才会创建ThreadLocalMap,由于每个thread都有一个threadlocals来存储对应的ThreadLocalMap,不存在共享问题,因此是线程安全的,不需要加锁。 首先创建INITIAL_CAPACITY大小的Entry数组。之后将firstKey的threadLocalHashCode和(INITIAL_CAPACITY - 1)取模。之后构造一个Entry传入这个hash表计算的index处。然后对于hash表的长度,size是动态计算的,初始为1,后续每次增减会用维护的这个size变量增减。如下图:

另外还维护的负载因子threshold,是len的2/3,当size大于这个值就开始扩容。

  /**
         * Set the resize threshold to maintain at worst a 2/3 load factor.
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }
2.3.2.1 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue)

批量构造,这种情况发生在InheritableThreadLocal的时候,一个子类要将父类全部的ThreadLocalMap继承,则会使用这个构造函数。除此之外ThreadLocal种不会用到这个构造函数。另外这个构造函数也是private的。不提供给用户访问。仅仅在createInheritedMap方法中调用。

        /**
         * Construct a new map including all Inheritable ThreadLocals
         * from given parent map. Called only by createInheritedMap.
         *
         * @param parentMap the map associated with parent thread.
         */
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            //拿到父类种的table及其长度
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            //根据父类长度设置负载因子
            setThreshold(len);
            //根据父类长度创建相同大小的hash表
            table = new Entry[len];
            //遍历赋值
            for (int j = 0; j < len; j++) {
               //通过entry判断是否为空,不为空则构造一个新的Entry
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    //拿到key
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        //重新根据hash计算index位置
                        int h = key.threadLocalHashCode & (len - 1);
                        //如果不为空则hash冲突,此时采用开放定址法重新计算
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        //长度加1
                        size++;
                    }
                }
            }
        }

childValue方法在InheritableThreadLocal实现,而ThreadLocal不支持。这个在后面InheritableThreadLocal的源码中进行讨论。

   /**
     * Method childValue is visibly defined in subclass
     * InheritableThreadLocal, but is internally defined here for the
     * sake of providing createInheritedMap factory method without
     * needing to subclass the map class in InheritableThreadLocal.
     * This technique is preferable to the alternative of embedding
     * instanceof tests in methods.
     */
    T childValue(T parentValue) {
        throw new UnsupportedOperationException();
    }

上面过程如下图:

对于hash碰撞之后使用的开放定址法使用的nextIndex将在后面进行讨论。

2.3.3 Hash及hash碰撞的处理方法

在讨论后面的set、get、remove之前,有两个基本的内容需要先理解清楚,第一个内容就是ThreadLocalMap的hash及hash碰撞的解决方法。

2.3.3.1 threadLocalHashCode的计算过程

ThreadLocalMap的hash算法主要依赖于threadLocalHashCode。其主要过程如下:

    /**
     * ThreadLocals rely on per-thread linear-probe hash maps attached
     * to each thread (Thread.threadLocals and
     * inheritableThreadLocals).  The ThreadLocal objects act as keys,
     * searched via threadLocalHashCode.  This is a custom hash code
     * (useful only within ThreadLocalMaps) that eliminates collisions
     * in the common case where consecutively constructed ThreadLocals
     * are used by the same threads, while remaining well-behaved in
     * less common cases.
     */
    private final int threadLocalHashCode = nextHashCode();
    
       /**
     * The difference between successively generated hash codes - turns
     * implicit sequential thread-local IDs into near-optimally spread
     * multiplicative hash values for power-of-two-sized tables.
     */
    private static final int HASH_INCREMENT = 0x61c88647;

    /**
     * Returns the next hash code.
     */
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

可以看到,主要的hashcode是采用0x61c88647这个魔术生成的,而不是常规的hash算法。0x61c88647这是一个特殊的数字。每次增加0x61c88647,之后取模能将碰撞的概率降到最低。这个魔数使用斐波拉契数列来实现hash算法。具体的数学原理本文无法讨论。通过如下代码进行测试:

public class MagicHashCodeTest {

	private static final int HASH_INCREMENT = 0x61c88647;


	public static void main(String[] args) {
		hashCode(16);
		hashCode(32);
		hashCode(64);
	}

	private static void hashCode(Integer length){
		int hashCode = 0;
		for(int i=0; i< length; i++){
			hashCode = i * HASH_INCREMENT+HASH_INCREMENT;
			System.out.print(hashCode & (length-1));
			System.out.print(" ");
		}
		System.out.println();
	}
}

可以看到,其结果真的很均匀:

14 5 12 3 10 1 8 15 6 13 4 11 2 9 0 
7 14 21 28 3 10 17 24 31 6 13 20 27 2 9 16 23 30 5 12 19 26 1 8 15 22 29 4 11 18 25 0 
7 14 21 28 35 42 49 56 63 6 13 20 27 34 41 48 55 62 5 12 19 26 33 40 47 54 61 4 11 18 25 32 39 46 53 60 3 10 17 24 31 38 45 52 59 2 9 16 23 30 37 44 51 58 1 8 15 22 29 36 43 50 57 0 

将hash碰撞的概率降到了最低。

2.3.3.2 hash碰撞的解决办法–开放定址法

虽然使用魔数将hash碰撞的概率降低了很多,但是,hash碰撞的可能性还是存在的。那么出现之后该如何处理呢? 参考前文: 解决哈希冲突的常用方法分析 ThreadLocalMap使用了开放定址法,即从发生冲突的那个单元起,按照一定的次序,从哈希表中找到一个空闲的单元。然后把发生冲突的元素存入到该单元。线行探查法是开放定址法中最简单的冲突处理方法,它从发生冲突的单元起,依次判断下一个单元是否为空,当达到最后一个单元时,再从表首依次判断。直到碰到空闲的单元或者探查完全部单元为止。

     /**
         * Increment i modulo len.
         */
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * Decrement i modulo len.
         */
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

nextIndex方法即是线性探查寻找下一个元素的方法。同样,prevIndex用来寻找上一个元素。

2.3.4 Entry过期擦除

此外,还要讨论的第二个问题是,key采用WeakReference,那么被GC回收之后,key将变成null,而value此时还在堆中继续占用内存。因此ThreadLocalMap会在每次set和get线性探测的过程中,将key为null的entry进行擦除。

2.3.4.1 指定Entry的index擦除
        /**
         * Expunge a stale entry by rehashing any possibly colliding entries
         * lying between staleSlot and the next null slot.  This also expunges
         * any other stale entries encountered before the trailing null.  See
         * Knuth, Section 6.4
         *
         * @param staleSlot index of slot known to have null key
         * @return the index of the next null slot after staleSlot
         * (all between staleSlot and this slot will have been checked
         * for expunging).
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            //对指定的staleSlot进行擦除操作
            // expunge entry at staleSlot
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            //长度减少1
            size--;

            // Rehash until we encounter null
            Entry e;
            int i;
            //对假定某个相同key可能到达的下一个节点进行线性探查,之后如果entry不为空而key为空则再次进行擦除,如果entry为空则退出循环
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                 //拿到key
                ThreadLocal<?> k = e.get();
                //如果key为空,则擦除
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    //长度减1
                    size--;
                } else {
                //反之,计算当前为key的hash是否是直接命中的
                    int h = k.threadLocalHashCode & (len - 1);
                    //如果h不为k那么说明这个entry是经过线性探查的结果
                    if (h != i) {
                        tab[i] = null;

                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                        将h上线性探测是否为空,如果为空则将e写入h
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

这个擦除过程比较复杂,当指定的hash表的索引staleSlot过期时,就将这个位置的元素擦除,之后,由于hash表使用了线性探查法,那么有可能这个指定的位置不是第一次hash命中的位置。那么就需要对这个值之后的元素也进行线性探测,将key不为null的元素前移,将key为null的元素擦除。一直要探测到entry为null则停止。之后返回这个entry为null的索引。 nextint方法其实也很简单,如果探测的i+1大于长度,则从0开始。那么实际上就等于是个环状数组。当确认某个位置的key为null需要执行过期擦除,那么需要对后面的元素进行探测。如果后面的元素的hash值计算之后与i不等,那么后面这个值可能是出现了hash碰撞,经过线性探测之后达到的,因此对这个值也需要再次检测。如果也过期,那么继续擦除,如果没过期,那么移动到他合适的位置。 探测过程如下图:

在上图中,假定开始探测的位置为2,后面的3、4都是经过开放定址之后相同的hash值插入的Entry。假定2、4为同一ThreadLocal,此时都过期了,而3为其他threadLocal此时没过期。 首先第一步就是将2的位置擦除。得到如下图:

然后进入for循环计算,nextIndex为3,此时计算3的hash计算结果与此时的i不等。那么将3移动到2。

此后进一步探测,nextIndex为4,4的key为null,那么将4擦除。

之后nextIndex为5,此时为null,结束循环,返回index为5。

2.3.4.2 批量擦除cleanSomeSlots
  /**
         * Heuristically scan some cells looking for stale entries.
         * This is invoked when either a new element is added, or
         * another stale one has been expunged. It performs a
         * logarithmic number of scans, as a balance between no
         * scanning (fast but retains garbage) and a number of scans
         * proportional to number of elements, that would find all
         * garbage but would cause some insertions to take O(n) time.
         *
         * @param i a position known NOT to hold a stale entry. The
         * scan starts at the element after i.
         *
         * @param n scan control: {@code log2(n)} cells are scanned,
         * unless a stale entry is found, in which case
         * {@code log2(table.length)-1} additional cells are scanned.
         * When called from insertions, this parameter is the number
         * of elements, but when from replaceStaleEntry, it is the
         * table length. (Note: all this could be changed to be either
         * more or less aggressive by weighting n instead of just
         * using straight log n. But this version is simple, fast, and
         * seems to work well.)
         *
         * @return true if any stale entries have been removed.
         */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

这个方法循环扫描次数由第二个参数n(最大等于Entry数组长度)控制,实际为log2n,是一种折中式的扫描方式。之后具体的执行过程由expungeStaleEntry控制。 方法从第一个参数指示索引的下一个元素开始扫描,返回值为是否找到擦除元素,即STALE状态元素。

2.3.4.3 全量擦除expungeStaleEntries
 /**
         * Expunge all stale entries in the table.
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null)
                    expungeStaleEntry(j);
            }
        }

该方法将hash表中的所有元素进行遍历,之后擦除操作。

2.3.5 set Entry

将Entry通过set的方法设置到hash表中。

        /**
         * Set the value associated with key.
         *
         * @param key the thread local object
         * @param value the value to be set
         */
         //设置Entry到hash表
        private void set(ThreadLocal<?> key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.
            //在执行set的时候需要考虑三种情况,分别是 重复插入、插入的key已经过期、碰撞
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            //根据key计算得到i
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();
                //当前的key与set的key相等,且值也相等,那么是重复插入,直接替换value。
                if (k == key) {
                    e.value = value;
                    return;
                }
               //当前的key为null,则说明过期,采用替换方法replace即可
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
            //正常考虑碰撞之后重新计算的i,这个位置可以插入
            tab[i] = new Entry(key, value);
            int sz = ++size;
            //对部分元素进行探测,clean操作,这会增加set的时间,如故擦除元素成功则不需要扩容,否则,可能需要扩容
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
            //扩容方法
                rehash();
        }

插入过程中可能会有三种情况,重复key插入;过期插入;常规插入。

  • 重复key插入 直接替换value即可。

上图红色部分极为替换之后的value。

  • 过期插入 如果插入过程中,找到的元素其key为null,则说明已过期。

之后执行插入操作:

  • 常规插入 如果遇到空的位置,能够进行常规插入,那么需要首先进行启发式擦除操作,如果擦除操作中被擦除的元素大于1,则说明插入这个Entry之后不需要扩容。此时直接插入。如果擦除操作没用擦除元素,那么需要执行rehash,判断hash表是否需要扩容。之后再进行插入。 插入前:

插入后:

2.3.6 replaceStaleEntry 替换过期Entry

再上一节中用到了一个特殊的方法,如果再插入的过程中遇到了过期的元素,那么需要执行 replaceStaleEntry方法,对过期的元素进行替换。

 /**
         * Replace a stale entry encountered during a set operation
         * with an entry for the specified key.  The value passed in
         * the value parameter is stored in the entry, whether or not
         * an entry already exists for the specified key.
         *
         * As a side effect, this method expunges all stale entries in the
         * "run" containing the stale entry.  (A run is a sequence of entries
         * between two null slots.)
         *
         * @param  key the key
         * @param  value the value to be associated with key
         * @param  staleSlot index of the first stale entry encountered while
         *         searching for key.
         */
        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // Back up to check for prior stale entry in current run.
            // We clean out whole runs at a time to avoid continual
            // incremental rehashing due to garbage collector freeing
            // up refs in bunches (i.e., whenever the collector runs).
            //首先向前探测,找到hash碰撞的起点,如果不存在hash碰撞,那么staleSlot不变
            int slotToExpunge = staleSlot;
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            // Find either the key or trailing null slot of run, whichever
            // occurs first
            //之后从起点开始逐个向后探测 
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // If we find key, then we need to swap it
                // with the stale entry to maintain hash table order.
                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                //如果key相同,则直接替换value,然后执行清理
                if (k == key) {
                    e.value = value;

                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // Start expunge at preceding stale entry if it exists
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // If we didn't find stale entry on backward scan, the
                // first stale entry seen while scanning for key is the
                // first still present in the run.
                //如果没用key为null则即是需要替换的,再后续处理
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // If key not found, put new entry in stale slot
            //处理key为null的逻辑,先将value设置为null这样消除引用,便于回收。之new一个Entry在此
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            //如果替换的位置发生改变,则说明已经将值设置到前面的节点上,那么后续还可能存在为空的情况,那么执行clean回收
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

这个方法比较复杂,开始以为只是一个简单的replace方法,结果发现不是。该方法的逻辑是,需要替换的这个位置,通过线性探测查找其上一个位置,一直找到起始位置进行记录,,之后再从这个位置向后探测。探测分为两种情况。如果遇到key相等的Entry,则直接替换value。如果没用,则在第一个key为空的位置,清除之后将Entry设置在此。 之后需要判断,设置Entry的位置与方法开始传入的staleSlot是否相等,如果不等,则再进行清理操作。 两种情况分别示例如下.

  • key重复 假定3为目前需要进行replace的位置:

之后探测到1为key的preIndex的起点,然后向后在4发现了key相等的位置。则直接替换value。

由于slotToExpunge!= staleSlot 此时执行clean。

clean的结果如上图。

  • key不重复 如果key不重复:

首先执行探测:

之后在第一个key为null的位置进行替换,再进行clean。

2.3.7 get Entry

get Entry的过程中有两种情况。即直接命中和出现碰撞两种情况。

2.3.7.1 getEntry
  /**
         * Get the entry associated with key.  This method
         * itself handles only the fast path: a direct hit of existing
         * key. It otherwise relays to getEntryAfterMiss.  This is
         * designed to maximize performance for direct hits, in part
         * by making this method readily inlinable.
         *
         * @param  key the thread local object
         * @return the entry associated with key, or null if no such
         */
        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

对命中key的hash计算的位置,判断Entry的key是否相同。如果key相同,则返回。如果不同,则需要用到另外一个方法 getEntryAfterMiss 这也是解决碰撞的方法。

2.3.7.1 getEntryAfterMiss
        /**
         * Version of getEntry method for use when key is not found in
         * its direct hash slot.
         *
         * @param  key the thread local object
         * @param  i the table index for key's hash code
         * @param  e the entry at table[i]
         * @return the entry associated with key, or null if no such
         */
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;
            //循环 判断Entry是否为空
            while (e != null) {
                ThreadLocal<?> k = e.get();
                //如果key相等 则返回
                if (k == key)
                    return e;
                //如果key不等,k为null则进行擦除
                if (k == null)
                    expungeStaleEntry(i);
                else
                //反之则向后探测
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }

查找方法,首先判断key是否相等,如果不等,则判断key是不是为空,为空则进行擦除,之后再向后探测。 假定开始从1开始查找:

之后在3的时候为null,需要进行清理,清理之后:

然后第三个i的key相同,则返回。

2.3.8 remove

remove Entry的方法如下:

 /**
         * Remove the entry for key.
         */
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

删除操作比较简单,根据hashcode计算的index进行判断,找到第一个key相等的位置,执行Reference的clear方法,之后进行擦除操作。

2.3.9 动态扩容机制

    /**
         * Set the resize threshold to maintain at worst a 2/3 load factor.
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

设置负载因子为2/3。在ThreadLocalMap中,只有添加Entry的set方法才会触发扩容。

 private void set(ThreadLocal<?> key, Object value) {
     tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }
 }

在set方法中如果clean没有回收长度,且新加入的Entry会导致长度大于等于threshould触发阈值,则执行rehash方法扩容。

        /**
         * Re-pack and/or re-size the table. First scan the entire
         * table removing stale entries. If this doesn't sufficiently
         * shrink the size of the table, double the table size.
         */
        private void rehash() {
            expungeStaleEntries();

            // Use lower threshold for doubling to avoid hysteresis
            if (size >= threshold - threshold / 4)
                resize();
        }

扩容之前执行expungeStaleEntries,全表扫描清除过期元素。之后再执行resize扩容。 此时再次确认size大于3/4。

        /**
         * Double the capacity of the table.
         */
        private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }

            setThreshold(newLen);
            size = count;
            table = newTab;
        }

以2的倍数进行扩容。 然后将旧的hash表的中的全部元素都按新的hash表进行映射,重新设置值。 再重新设置的过程中,如果遇到key为null则擦除。 LocalThreadMap只有扩容过程,不会收缩。因为一个ThreadLocal的变量应该在一个可控范围。

3.ThreadLocal总结

Threadlocal中使用了很多比较巧妙的设计。在此进行总结:

  • ThreadLocal中,threadLocalmap的key是Weakreference的ThreadLocal本身。在强引用消失之后会被GC回收。之后value由于是强引用不会回收,任然会在内存中。因此这依赖于我们执行threadlocal过程中get和set时的clean操作。但是这个操作不是一定会发生。因此这也是导致内存泄漏的根源。因此对于threadlocal。我们需要及时使用remove方法将我们不用的对象清除。
  • ThreadLocalMap采用魔数实现hash算法。0x61c88647 这是一个神奇的数字。通过每次加0x61c88647之后取模能尽量均匀的分布在哈希表中。
  • ThreadLocalMap 对于hash冲突采用开放定址法中的线性探测法。每次向后加1。因此这会导致每次get、set、remove、clean等操作都需要进行线性探测。
  • threadLocalMap只能扩容,不会像hashmap那样缩容。因此这也是一个导致线程内存变大的原因。

4.扩展

对于threadlocal,还有一个变体是InheritableThreadLocal,实际上还有netty也提供了类似的FastThreadLocal,其性能比threadLocal要高很多。将在后续的文章继续讨论。