java源码解析之ConcurrentHashMap

ConcurrentHashMap的并发实现

  • transient volatile Node<K,V>[] table;确保了table对所有线程的可见性
  • transient volatile Node<K,V>[] nextTable;用于hashmap重新resize时
  • 确定元素在hashmap中的位置

    1
    2
    3
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    通过源码可以看出,tabAt调用了Unsafe类的getObjectVolatile方法(获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义),再加上table是线程可见的,因此保证了tabAt拿到的都是最新的数据

  • 更新table数组中的元素

    • casTabAt

      1
      2
      3
      static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
      return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
      }

      通过源码可以看出,在更新table数组中的元素操作中,jdk使用了Unsafe类的CAS原子算法,实现元素的更改操作

    • setTabAt

      1
      2
      3
      static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
      U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
      }

      通过源码可以看出,在更新table数组中的元素操作中,以volatile写的形式更新table数组(volatile写的形式jvm会强制将写的数据刷新到主内存,使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量tab的缓存行无效)

  • put操作

    • 乐观锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      for (Node<K,V>[] tab = table;;) {
      Node<K,V> f; int n, i, fh;
      if (tab == null || (n = tab.length) == 0)
      tab = initTable();
      else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
      break; // no lock when adding to empty bin
      }
      }

      这里存在着java内存模型happen-before的应用,tabAt是volatile读,casTabAt具有volatile读写相同的内存语义,因此casTabAt所做的操作都能立即反馈会tabAt方法,并且,jdk在这里运用了for+cas更新实现无锁的更新操作——乐观锁,相比悲观锁性能有了一定的提升

    • 分段锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      for (Node<K,V>[] tab = table;;) {
      Node<K,V> f; int n, i, fh;
      if (tab == null || (n = tab.length) == 0)
      tab = initTable();
      else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      ...
      }
      else {
      V oldVal = null;
      synchronized (f) {
      if (tabAt(tab, i) == f) {
      ...
      }
      }
      }
      }

      在jdk1.7的ConcurrentHashMap的实现中,是采用Segment数组管理table数组,每一个segment[]控制了table数组的一部分(这个控制通过hash来实现),通过对segment元素进行锁控制实现并发;而在jdk1.8中,直接舍弃了Segment数组,转而对table的元素进行锁控制,相比jdk1.7而言,锁的细粒度变小了,进而使得并发能力得到了提升

  • remove操作

    • replaceNode

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      final V replaceNode(Object key, V value, Object cv) {
      ...
      for (Node<K,V> e = f, pred = null;;) {
      K ek;
      if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
      V ev = e.val;
      if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
      oldVal = ev;
      if (value != null)
      e.val = value;
      else if (pred != null)
      pred.next = e.next;
      else
      setTabAt(tab, i, e.next);
      }
      break;
      }
      pred = e;
      if ((e = e.next) == null)
      break;
      }
      ...
      }

      通过循环查找key所对应的key,由于在进入循环时已经通过tabAt以及synchronized获取到了table对应元素的锁,因此直接使用volatile写更新table元素即可

  • 遍历操作(弱一致性)

    • 遍历操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      // ConcurrentHashMap

      public final Map.Entry<K,V> next() {
      Node<K,V> p;
      if ((p = next) == null)
      throw new NoSuchElementException();
      K k = p.key;
      V v = p.val;
      lastReturned = p;
      advance();
      return new MapEntry<K,V>(k, v, map);
      }

      // HashMap

      final class EntryIterator extends HashIterator implements Iterator<Map.Entry<K,V>> {

      public final Map.Entry<K,V> next() {
      return nextNode(); }
      }

      通过两个map的EntryIterator源码可以很容易看出,ConcurrentHashMap在遍历操作时,舍弃了fail-fast机制,迭代器next返回的元素是直接new了一个新的元素出来,因此迭代器线程依旧遍历的是旧的数据集

为什么扩容的大小总是以2的n次方

  • 加速hash运算

  • rehash的优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    for (int j = 0; j < oldCap; ++j) {
    Node<K,V> e;
    if ((e = oldTab[j]) != null) {
    oldTab[j] = null;
    if (e.next == null)
    newTab[e.hash & (newCap - 1)] = e;
    else if (e instanceof TreeNode)
    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
    else { // preserve order
    Node<K,V> loHead = null, loTail = null;
    Node<K,V> hiHead = null, hiTail = null;
    Node<K,V> next;
    do {
    next = e.next;
    if ((e.hash & oldCap) == 0) {
    if (loTail == null)
    loHead = e;
    else
    loTail.next = e;
    loTail = e;
    }
    else {
    if (hiTail == null)
    hiHead = e;
    else
    hiTail.next = e;
    hiTail = e;
    }
    } while ((e = next) != null);
    if (loTail != null) {
    loTail.next = null;
    newTab[j] = loHead;
    }
    if (hiTail != null) {
    hiTail.next = null;
    newTab[j + oldCap] = hiHead;
    }
    }
    }
    }

    使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置,在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”

为什么ConcurrentHashMap的get操作不需要加锁

先放上ConcurrentHashMapget源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// map容器是否被初始化以及tab[key]槽位上的元素不为null
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
// 如果槽位上的元素的hash值与key的hash值相同
if ((eh = e.hash) == h) {
// 如果两个key相同则找到
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
// 如果计算出来的hash值小于0,则表示该槽位挂着的是一棵红黑树,采用二叉查找树的形式搜索
return (p = e.find(h, key)) != null ? p.val : null;
// 此处则确定是链表的形式,直接遍历查找比对即可
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

这里可以看到,get的整个源码并没有出现任何锁的操作,那么原因是是什么?这里需要回溯看下ConcurrentHashMap中的几个重要的成员对象

Node[] 数组

1
transient volatile Node<K,V>[] table;

Node 节点

1
2
3
4
5
6
7
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
}

可以看到,这两个重要的成员都被volatile进行了修饰,使得他们在并发的情况下数据的变动对于其他线程是可以立即感知的

  • 首先第一个tablevolatile修饰,确保了在扩容时,扩容后的结果对其他线程是立即可见的
  • 其次,Node节点中的val以及next熟悉被volatile修饰,确保下面操作在多线程之间的数据可见性
    • ConcurrentHashMap进行put一份数据<K, V>
    • 线程A对Map进行remove操作,线程B对Map进行get操作
    • 如果A 先于 B,由于volatile的特新,B无法get到被删除的数据

tabAt的函数操作

1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

ConcurrentHashMap中这个函数总是见到,这个函数的作用是将变量以volatile的语义进行读操作。为什么需要这个操作呢?

1
2
3
4
5
6
7
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// map容器是否被初始化以及tab[key]槽位上的元素不为null
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
...
}
...

上述的代码中是一个tabAt的使用场景,入参是在方法中的局部变量tab,那么就很好理解了,因为tab是局部变量,他并没有被volatile所修饰,虽然他是被volatile table赋值的,但是并不代表tab也要volatile语义