Chinaunix首页 | 论坛 | 博客
  • 博客访问: 174298
  • 博文数量: 30
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 342
  • 用 户 组: 普通用户
  • 注册时间: 2018-09-03 21:42
文章分类
文章存档

2018年(30)

我的朋友

分类: Java

2018-09-10 20:14:05

本文将深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源码,解析 HashMap 线程不安全的原理以及解决方案,最后以测试用例加以验证。

1 Java7 HashMap

HashMap 的数据结构:


从上图中可以看出,HashMap 底层就是一个数组结构,数组中的每一项又是一个链表。

通过查看 JDK 中的 HashMap 源码,可以看到其构造函数有一行代码:

public HashMap(int initialCapacity, float loadFactor) {
    ...
    table = new Entry[capacity];
    ...
}

即创建了一个大小为 capacity 的 Entry 数组,而 Entry 的结构如下:

static class Entry implements Map.Entry {
    final K key;
    V value;
    Entry next;
    final int hash;
    ……
}

可以看到,Entry 是一个 static class,其中包含了 key 和 value ,也就是键值对,另外还包含了一个 next 的 Entry 指针。

  • capacity:当前数组容量,始终保持 2^n,可以扩容,扩容后数组大小为当前的 2 倍。默认初始容量为 16。
  • loadFactor:负载因子,默认为 0.75。
  • threshold:扩容的阈值,等于 capacity * loadFactor

1.1 put过程分析

public V put(K key, V value) {
    // 当插入第一个元素的时候,需要先初始化数组大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 如果 key 为 null,则这个 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // key 的 hash 值
    int hash = hash(key);
    // 找到对应的数组下标
    int i = indexFor(hash, table.length);
    // 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,
    // 如果有,直接覆盖,put 方法返回旧值就结束了
    for (Entry e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
 
    modCount++;
    // 不存在重复的 key,将此 entry 添加到链表中
    addEntry(hash, key, value, i);
    return null;
}

这里对一些方法做深入解析。

  • 数组初始化
private void inflateTable(int toSize) {
    // 保证数组大小一定是 2^n
    int capacity = roundUpToPowerOf2(toSize);
    // 计算扩容阈值
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 初始化数组
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity);
}
  • 找到对应的数组下标
static int indexFor(int hash, int length) {
    // 作用等价于取模运算,但这种方式效率更高
    return hash & (length-1);
}

因为HashMap的底层数组长度总是 2^n,当 length 为 2 的 n 次方时,hash & (length-1) 就相当于对length取模,而且速度比直接取模要快的多。

  • 添加节点到链表中
void addEntry(int hash, K key, V value, int bucketIndex) {
    // 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 扩容
        resize(2 * table.length);
        // 重新计算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 计算扩容后的新的下标
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}
// 永远都是在链表的表头添加新元素
void createEntry(int hash, K key, V value, int bucketIndex) {
    // 获取指定 bucketIndex 索引处的 Entry
    Entry e = table[bucketIndex];
    // 将新创建的 Entry 放入 bucketIndex 索引处,并让新的 Entry 指向原来的 Entry
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

当系统决定存储 HashMap 中的 key-value 对时,完全没有考虑 Entry 中的 value,仅仅只是根据 key 来计算并决定每个 Entry 的存储位置。我们完全可以把 Map 集合中的 value 当成 key 的附属,当系统决定了 key 的存储位置之后,value 随之保存在那里即可。

  • 数组扩容

随着 HashMap 中元素的数量越来越多,发生碰撞的概率将越来越大,所产生的子链长度就会越来越长,这样势必会影响 HashMap 的存取速度。为了保证 HashMap 的效率,系统必须要在某个临界点进行扩容处理,该临界点 threshold。而在 HashMap 数组扩容之后,最消耗性能的点就出现了:原数组中的数据必须重新计算其在新数组中的位置,并放进去,这就是 resize。

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 若 oldCapacity 已达到最大值,直接将 threshold 设为 Integer.MAX_VALUE
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return; // 直接返回
    }
    // 否则,创建一个更大的数组
    Entry[] newTable = new Entry[newCapacity];
    //将每条Entry重新哈希到新的数组中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    // 重新设定 threshold
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

1.2 get过程分析

public V get(Object key) {
    // key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了
    if (key == null)
        return getForNullKey();
    // key 非 null 的情况,详见下文
    Entry entry = getEntry(key);
 
    return null == entry ? null : entry.getValue();
}
final Entry getEntry(Object key) {
    // The number of key-value mappings contained in this map.
    if (size == 0) {
        return null;
    }
 
    // 根据该 key 的 hashCode 值计算它的 hash 码 
    int hash = (key == null) ? 0 : hash(key);
    // 确定数组下标,然后从头开始遍历链表,直到找到为止
    for (Entry e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        //若搜索的key与查找的key相同,则返回相对应的value
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

2 Java7 ConcurrentHashMap

ConcurrentHashMap 的成员变量中,包含了一个 Segment 数组 final Segment[] segments;,而 Segment 是ConcurrentHashMap 的内部类。

然后在 Segment 这个类中,包含了一个 HashEntry 的数组transient volatile HashEntry[] table,而 HashEntry 也是 ConcurrentHashMap 的内部类。

HashEntry 中,包含了 key 和 value 以及 next 指针(类似于 HashMap 中的 Entry),所以 HashEntry 可以构成一个链表。

2.1 成员变量及构造函数

public class ConcurrentHashMap extends AbstractMap
        implements ConcurrentMap, Serializable { 
    ...
    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的并发等级,表示当前更新线程的估计数
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment数量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment数量
    static final int MAX_SEGMENTS = 1 << 16; 
    //
    static final int RETRIES_BEFORE_LOCK = 2;
    // segments 的掩码值, key 的散列码的高位用来选择具体的 segment
    final int segmentMask; 
    // 偏移量
    final int segmentShift; 
    final Segment[] segments; 
    ...
    // 创建一个带有指定初始容量、加载因子和并发级别的新的空映射
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 寻找最佳匹配参数(不小于给定参数的最接近的 2^n)
        int sshift = 0; // 用来记录向左按位移动的次数
        int ssize = 1; // 用来记录Segment数组的大小
        // 计算并行级别 ssize,因为要保持并行级别是 2^n
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 若为默认值,concurrencyLevel 为 16,sshift 为 4
        // 那么计算出 segmentShift 为 28,segmentMask 为 15
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 记录每个 Segment 上要放置多少个元素
        int c = initialCapacity / ssize;
        // 假如有余数,则Segment数量加1
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment s0 =
            new Segment(loadFactor, (int)(cap * loadFactor),
                             (HashEntry[])new HashEntry[cap]);
        Segment[] ss = (Segment[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

当用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

  • Segment 数组长度为 16,不可以扩容
  • Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
  • 当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值马上就会用到

2.2 put过程分析

根据 hash 值很快就能找到相应的 Segment,之后就是 Segment 内部的 put 操作。

public V put(K key, V value) {
    Segment s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 根据 hash 值找到 Segment 数组中的位置 j
    // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
    // ensureSegment(j) 对 segment[j] 进行初始化
    if ((s = (Segment)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

Segment 内部是由 数组+链表 组成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先获取该 segment 的独占锁
    // 每一个Segment进行put时,都会加锁
    HashEntry node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 内部的数组
        HashEntry[] tab = table;
        // 利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // 数组该位置处的链表的表头
        HashEntry first = entryAt(tab, index);
        for (HashEntry e = first;;) {
            // 如果链头不为 null
            if (e != null) {
                K k;
                //如果在该链中找到相同的key,则用新值替换旧值,并退出循环
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
                e = e.next;
            }
            else {
                // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry(hash, key, value, first);
                int c = count + 1;
                // 如果超过了该 segment 的阈值,这个 segment 需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

2.3 初始化Segment

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。

这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

private Segment ensureSegment(int k) {
    final Segment[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment seg;
    if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为什么之前要初始化 segment[0] 了,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
        // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
        Segment proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        // 初始化 segment[k] 内部的数组
        HashEntry[] tab = (HashEntry[])new HashEntry[cap];
        if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck Segment[k] 是否被其它线程初始化了
            Segment s = new Segment(lf, threshold, tab);
            // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
            while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

2.4 get过程分析

比较简单,先找到 Segment 数组的位置,然后找到 HashEntry 数组的位置,最后顺着链表查找即可。

public V get(Object key) {
    Segment s; // manually integrate access methods to reduce overhead
    HashEntry[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

3 线程不安全

3.1 哈希碰撞

多个线程同时使用 put() 方法添加元素,若存在两个或多个 put() 的 key 发生了碰撞,那么有可能其中一个线程的数据被覆盖。

3.2 扩容

当数据要插入 HashMap 时,都会检查容量有没有超过设定的 thredhold,如果超过,则需要扩容。而多线程会导致扩容后的链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 的节点永远不为 null,就会在获取 Entry 时产生死循环。

例子可见文章《HashMap多线程死循环问题》。

不过要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加节点到链表中是先扩容后再添加,而例子中的源码是:

void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry e = table[bucketIndex];
    // 先添加节点
    table[bucketIndex] = new Entry(hash, key, value, e);
    
    // 然后扩容
    if (size++ >= threshold)
        resize(2 * table.length);
}

所以在 Java7 中此例子无效。而在 Java8 中,通过确保建新链与旧链的顺序是相同的,即可避免产生死循环。

4 HashMap遍历方式

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class HashMapTest {
    private final static Map map = new HashMap(10000);
    private static final Object PRESENT = new Object();
    public static void main(String args[]) {
        long startTime;
        long endTime;
        long totalTime;
        for (int i = 0; i < 7500; i++) {
            map.put(i, PRESENT);
        }
        // 方法一
        startTime = System.nanoTime();
        Iterator iter1 = map.entrySet().iterator();
        while (iter1.hasNext()) {
            Map.Entry entry = (Map.Entry) iter1.next();
            Integer key = entry.getKey();
            Object val = entry.getValue();
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor1 pays " + totalTime + " ms");
        
        // 方法二
        startTime = System.nanoTime();
        Iterator iter2 = map.keySet().iterator();
        while (iter2.hasNext()) {
            Object key = iter2.next();
            Object val = map.get(key);
        }
        endTime = System.nanoTime();
        totalTime = endTime - startTime;
        System.out.println("methor2 pays " + totalTime + " ms");
    }
}

运行结果:


5 性能对比

线程安全的使用 HashMap 有三种方式,分别为 Hashtable、SynchronizedMap()、ConcurrentHashMap。

Hashtable

使用 synchronized 来保证线程安全,几乎所有的 public 的方法都是 synchronized 的,而有些方法也是在内部通过 synchronized 代码块来实现。

synchronizedMap()

通过创建一个线程安全的 Map 对象,并把它作为一个封装的对象来返回。

ConcurrentHashMap

支持多线程对 Map 做读操作,并且不需要任何的 blocking 。这得益于 CHM 将 Map 分割成了不同的部分,在执行更新操作时只锁住一部分。根据默认的并发级别, Map 被分割成 16 个部分,并且由不同的锁控制。这意味着,同时最多可以有 16个 写线程操作 Map 。试想一下,由只能一个线程进入变成同时可由 16 个写线程同时进入(读线程几乎不受限制),性能的提升是显而易见的。但由于一些更新操作,如 put(), remove(), putAll(), clear()只锁住操作的部分,所以在检索操作不能保证返回的是最新的结果。

在迭代遍历 CHM 时, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不会返回某些最近的改变,并且在遍历过程中,如果已经遍历的数组上的内容变化了,不会抛出 ConcurrentModificationExceptoin 的异常。

什么时候使用 ConcurrentHashMap ?

CHM 适用于读者数量超过写者时,当写者数量大于等于读者时,CHM 的性能是低于 Hashtable 和 synchronizedMap 的。这是因为当锁住了整个 Map 时,读操作要等待对同一部分执行写操作的线程结束。

CHM 适用于做 cache ,在程序启动时初始化,之后可以被多个请求线程访问。

CHM 是Hashtable一个很好的替代,但要记住, CHM 的比 Hashrable 的同步性稍弱。

6 拓展:Java8 HashMap & ConcurrentHashMap

Java8 对 HashMap 和 ConcurrentHashMap 做了一些修改:

  • 二者均利用了红黑树,所以其数据结构由 数组 + 链表 + 红黑树 组成。我们知道,链表上的数据需要一个一个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n)。为了降低这一部分的开销,在 Java8 中,当链表中的元素超过了 8 个以后,会将链表转换为红黑树,这个时候时间复杂度就降为了 O(logN)
  • Java8 中 ConcurrentHashMap 摒弃 Java7 中的 Segment 的概念,使用了另一种方式实现保证线程安全。
阅读(1161) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~