ConcurrentHashMap的并发实现
- transient volatile Node<K,V>[] table;确保了table对所有线程的可见性
- transient volatile Node<K,V>[] nextTable;用于hashmap重新resize时
确定元素在hashmap中的位置
1
2
3static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}通过源码可以看出,tabAt调用了Unsafe类的getObjectVolatile方法(获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义),再加上table是线程可见的,因此保证了tabAt拿到的都是最新的数据
更新table数组中的元素
casTabAt
1
2
3static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}通过源码可以看出,在更新table数组中的元素操作中,jdk使用了Unsafe类的CAS原子算法,实现元素的更改操作
setTabAt
1
2
3static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}通过源码可以看出,在更新table数组中的元素操作中,以volatile写的形式更新table数组(volatile写的形式jvm会强制将写的数据刷新到主内存,使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量tab的缓存行无效)
put操作
乐观锁
1
2
3
4
5
6
7
8
9for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
}这里存在着java内存模型happen-before的应用,tabAt是volatile读,casTabAt具有volatile读写相同的内存语义,因此casTabAt所做的操作都能立即反馈会tabAt方法,并且,jdk在这里运用了
for+cas更新
实现无锁的更新操作——乐观锁,相比悲观锁性能有了一定的提升分段锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
...
}
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
...
}
}
}
}在jdk1.7的ConcurrentHashMap的实现中,是采用Segment数组管理table数组,每一个segment[]控制了table数组的一部分(这个控制通过hash来实现),通过对segment元素进行锁控制实现并发;而在jdk1.8中,直接舍弃了Segment数组,转而对table的元素进行锁控制,相比jdk1.7而言,锁的细粒度变小了,进而使得并发能力得到了提升
remove操作
replaceNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23final V replaceNode(Object key, V value, Object cv) {
...
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
...
}通过循环查找key所对应的key,由于在进入循环时已经通过tabAt以及synchronized获取到了table对应元素的锁,因此直接使用volatile写更新table元素即可
遍历操作(弱一致性)
遍历操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// ConcurrentHashMap
public final Map.Entry<K,V> next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
K k = p.key;
V v = p.val;
lastReturned = p;
advance();
return new MapEntry<K,V>(k, v, map);
}
// HashMap
final class EntryIterator extends HashIterator implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() {
return nextNode(); }
}通过两个map的EntryIterator源码可以很容易看出,ConcurrentHashMap在遍历操作时,舍弃了fail-fast机制,迭代器next返回的元素是直接new了一个新的元素出来,因此迭代器线程依旧遍历的是旧的数据集
为什么扩容的大小总是以2的n次方
加速hash运算
rehash的优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置,在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”
为什么ConcurrentHashMap的get操作不需要加锁
先放上ConcurrentHashMap
的get
源码
1 | public V get(Object key) { |
这里可以看到,get
的整个源码并没有出现任何锁的操作,那么原因是是什么?这里需要回溯看下ConcurrentHashMap
中的几个重要的成员对象
Node[] 数组
1 | transient volatile Node<K,V>[] table; |
Node 节点
1 | static class Node<K,V> implements Map.Entry<K,V> { |
可以看到,这两个重要的成员都被volatile
进行了修饰,使得他们在并发的情况下数据的变动对于其他线程是可以立即感知的
- 首先第一个
table
用volatile
修饰,确保了在扩容时,扩容后的结果对其他线程是立即可见的 - 其次,
Node
节点中的val
以及next
熟悉被volatile
修饰,确保下面操作在多线程之间的数据可见性- 对
ConcurrentHashMap
进行put
一份数据<K, V> - 线程A对
Map
进行remove
操作,线程B对Map
进行get
操作 - 如果A 先于 B,由于
volatile
的特新,B无法get
到被删除的数据
- 对
tabAt的函数操作
1 | static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { |
在ConcurrentHashMap
中这个函数总是见到,这个函数的作用是将变量以volatile
的语义进行读操作。为什么需要这个操作呢?
1 | Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; |
上述的代码中是一个tabAt
的使用场景,入参是在方法中的局部变量tab
,那么就很好理解了,因为tab
是局部变量,他并没有被volatile
所修饰,虽然他是被volatile table
赋值的,但是并不代表tab
也要volatile
语义