2018年(30)
分类: Java
2018-09-10 20:14:05
本文将深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源码,解析 HashMap 线程不安全的原理以及解决方案,最后以测试用例加以验证。
HashMap 的数据结构:
从上图中可以看出,HashMap 底层就是一个数组结构,数组中的每一项又是一个链表。
通过查看 JDK 中的 HashMap 源码,可以看到其构造函数有一行代码:
public HashMap(int initialCapacity, float loadFactor) { ... table = new Entry[capacity]; ... }
即创建了一个大小为 capacity 的 Entry 数组,而 Entry 的结构如下:
static class Entryimplements Map.Entry { final K key; V value; Entry next; final int hash; …… }
可以看到,Entry 是一个 static class,其中包含了 key 和 value ,也就是键值对,另外还包含了一个 next 的 Entry 指针。
public V put(K key, V value) { // 当插入第一个元素的时候,需要先初始化数组大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 如果 key 为 null,则这个 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); // key 的 hash 值 int hash = hash(key); // 找到对应的数组下标 int i = indexFor(hash, table.length); // 遍历一下对应下标处的链表,看是否有重复的 key 已经存在, // 如果有,直接覆盖,put 方法返回旧值就结束了 for (Entrye = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; // 不存在重复的 key,将此 entry 添加到链表中 addEntry(hash, key, value, i); return null; }
这里对一些方法做深入解析。
private void inflateTable(int toSize) { // 保证数组大小一定是 2^n int capacity = roundUpToPowerOf2(toSize); // 计算扩容阈值 threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); // 初始化数组 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); }
static int indexFor(int hash, int length) { // 作用等价于取模运算,但这种方式效率更高 return hash & (length-1); }
因为HashMap的底层数组长度总是 2^n,当 length 为 2 的 n 次方时,hash & (length-1) 就相当于对length取模,而且速度比直接取模要快的多。
void addEntry(int hash, K key, V value, int bucketIndex) { // 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容 if ((size >= threshold) && (null != table[bucketIndex])) { // 扩容 resize(2 * table.length); // 重新计算 hash 值 hash = (null != key) ? hash(key) : 0; // 计算扩容后的新的下标 bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } // 永远都是在链表的表头添加新元素 void createEntry(int hash, K key, V value, int bucketIndex) { // 获取指定 bucketIndex 索引处的 Entry Entrye = table[bucketIndex]; // 将新创建的 Entry 放入 bucketIndex 索引处,并让新的 Entry 指向原来的 Entry table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
当系统决定存储 HashMap 中的 key-value 对时,完全没有考虑 Entry 中的 value,仅仅只是根据 key 来计算并决定每个 Entry 的存储位置。我们完全可以把 Map 集合中的 value 当成 key 的附属,当系统决定了 key 的存储位置之后,value 随之保存在那里即可。
随着 HashMap 中元素的数量越来越多,发生碰撞的概率将越来越大,所产生的子链长度就会越来越长,这样势必会影响 HashMap 的存取速度。为了保证 HashMap 的效率,系统必须要在某个临界点进行扩容处理,该临界点 threshold。而在 HashMap 数组扩容之后,最消耗性能的点就出现了:原数组中的数据必须重新计算其在新数组中的位置,并放进去,这就是 resize。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; // 若 oldCapacity 已达到最大值,直接将 threshold 设为 Integer.MAX_VALUE if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; // 直接返回 } // 否则,创建一个更大的数组 Entry[] newTable = new Entry[newCapacity]; //将每条Entry重新哈希到新的数组中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; // 重新设定 threshold threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); }
public V get(Object key) { // key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了 if (key == null) return getForNullKey(); // key 非 null 的情况,详见下文 Entryentry = getEntry(key); return null == entry ? null : entry.getValue(); } final Entry getEntry(Object key) { // The number of key-value mappings contained in this map. if (size == 0) { return null; } // 根据该 key 的 hashCode 值计算它的 hash 码 int hash = (key == null) ? 0 : hash(key); // 确定数组下标,然后从头开始遍历链表,直到找到为止 for (Entry e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; //若搜索的key与查找的key相同,则返回相对应的value if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; }
ConcurrentHashMap 的成员变量中,包含了一个 Segment 数组 final Segment
然后在 Segment 这个类中,包含了一个 HashEntry 的数组transient volatile HashEntry
HashEntry 中,包含了 key 和 value 以及 next 指针(类似于 HashMap 中的 Entry),所以 HashEntry 可以构成一个链表。
public class ConcurrentHashMapextends AbstractMap implements ConcurrentMap , Serializable { ... //初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的并发等级,表示当前更新线程的估计数 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment数量 static final int MAX_SEGMENTS = 1 << 16; // static final int RETRIES_BEFORE_LOCK = 2; // segments 的掩码值, key 的散列码的高位用来选择具体的 segment final int segmentMask; // 偏移量 final int segmentShift; final Segment [] segments; ... // 创建一个带有指定初始容量、加载因子和并发级别的新的空映射 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 寻找最佳匹配参数(不小于给定参数的最接近的 2^n) int sshift = 0; // 用来记录向左按位移动的次数 int ssize = 1; // 用来记录Segment数组的大小 // 计算并行级别 ssize,因为要保持并行级别是 2^n while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 若为默认值,concurrencyLevel 为 16,sshift 为 4 // 那么计算出 segmentShift 为 28,segmentMask 为 15 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 记录每个 Segment 上要放置多少个元素 int c = initialCapacity / ssize; // 假如有余数,则Segment数量加1 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment s0 = new Segment (loadFactor, (int)(cap * loadFactor), (HashEntry [])new HashEntry[cap]); Segment [] ss = (Segment [])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
当用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
根据 hash 值很快就能找到相应的 Segment,之后就是 Segment 内部的 put 操作。
public V put(K key, V value) { Segments; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根据 hash 值找到 Segment 数组中的位置 j // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位, // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标 int j = (hash >>> segmentShift) & segmentMask; // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null, // ensureSegment(j) 对 segment[j] 进行初始化 if ((s = (Segment )UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
Segment 内部是由 数组+链表 组成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 先获取该 segment 的独占锁 // 每一个Segment进行put时,都会加锁 HashEntrynode = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // segment 内部的数组 HashEntry [] tab = table; // 利用 hash 值,求应该放置的数组下标 int index = (tab.length - 1) & hash; // 数组该位置处的链表的表头 HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { // 如果链头不为 null if (e != null) { K k; //如果在该链中找到相同的key,则用新值替换旧值,并退出循环 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else e = e.next; } else { // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。 // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。 if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; // 如果超过了该 segment 的阈值,这个 segment 需要扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // 没有达到阈值,将 node 放到数组 tab 的 index 位置, // 其实就是将新的节点设置成原链表的表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解锁 unlock(); } return oldValue; }
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
private SegmentensureSegment(int k) { final Segment [] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment seg; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry [] tab = (HashEntry [])new HashEntry[cap]; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment[k] 是否被其它线程初始化了 Segment s = new Segment (lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
比较简单,先找到 Segment 数组的位置,然后找到 HashEntry 数组的位置,最后顺着链表查找即可。
public V get(Object key) { Segments; // manually integrate access methods to reduce overhead HashEntry [] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
多个线程同时使用 put() 方法添加元素,若存在两个或多个 put() 的 key 发生了碰撞,那么有可能其中一个线程的数据被覆盖。
当数据要插入 HashMap 时,都会检查容量有没有超过设定的 thredhold,如果超过,则需要扩容。而多线程会导致扩容后的链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 的节点永远不为 null,就会在获取 Entry 时产生死循环。
例子可见文章《HashMap多线程死循环问题》。
不过要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加节点到链表中是先扩容后再添加,而例子中的源码是:
void addEntry(int hash, K key, V value, int bucketIndex) { Entrye = table[bucketIndex]; // 先添加节点 table[bucketIndex] = new Entry (hash, key, value, e); // 然后扩容 if (size++ >= threshold) resize(2 * table.length); }
所以在 Java7 中此例子无效。而在 Java8 中,通过确保建新链与旧链的顺序是相同的,即可避免产生死循环。
import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class HashMapTest { private final static Mapmap = new HashMap (10000); private static final Object PRESENT = new Object(); public static void main(String args[]) { long startTime; long endTime; long totalTime; for (int i = 0; i < 7500; i++) { map.put(i, PRESENT); } // 方法一 startTime = System.nanoTime(); Iterator iter1 = map.entrySet().iterator(); while (iter1.hasNext()) { Map.Entry entry = (Map.Entry) iter1.next(); Integer key = entry.getKey(); Object val = entry.getValue(); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor1 pays " + totalTime + " ms"); // 方法二 startTime = System.nanoTime(); Iterator iter2 = map.keySet().iterator(); while (iter2.hasNext()) { Object key = iter2.next(); Object val = map.get(key); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor2 pays " + totalTime + " ms"); } }
运行结果:
线程安全的使用 HashMap 有三种方式,分别为 Hashtable、SynchronizedMap()、ConcurrentHashMap。
使用 synchronized 来保证线程安全,几乎所有的 public 的方法都是 synchronized 的,而有些方法也是在内部通过 synchronized 代码块来实现。
通过创建一个线程安全的 Map 对象,并把它作为一个封装的对象来返回。
支持多线程对 Map 做读操作,并且不需要任何的 blocking 。这得益于 CHM 将 Map 分割成了不同的部分,在执行更新操作时只锁住一部分。根据默认的并发级别, Map 被分割成 16 个部分,并且由不同的锁控制。这意味着,同时最多可以有 16个 写线程操作 Map 。试想一下,由只能一个线程进入变成同时可由 16 个写线程同时进入(读线程几乎不受限制),性能的提升是显而易见的。但由于一些更新操作,如 put(), remove(), putAll(), clear()只锁住操作的部分,所以在检索操作不能保证返回的是最新的结果。
在迭代遍历 CHM 时, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不会返回某些最近的改变,并且在遍历过程中,如果已经遍历的数组上的内容变化了,不会抛出 ConcurrentModificationExceptoin 的异常。
什么时候使用 ConcurrentHashMap ?
CHM 适用于读者数量超过写者时,当写者数量大于等于读者时,CHM 的性能是低于 Hashtable 和 synchronizedMap 的。这是因为当锁住了整个 Map 时,读操作要等待对同一部分执行写操作的线程结束。
CHM 适用于做 cache ,在程序启动时初始化,之后可以被多个请求线程访问。
CHM 是Hashtable一个很好的替代,但要记住, CHM 的比 Hashrable 的同步性稍弱。
Java8 对 HashMap 和 ConcurrentHashMap 做了一些修改: