ConcurrentHashMap源码分析

本文的分析的源码是JDK8的版本,与JDK6的版本有很大的差异。实现线程安全的思想也已经完全变了,它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

与同是线程安全的老大哥HashTable相比,它已经更胜一筹,因此它的锁更加细化,而不是像HashTable一样为几乎每个方法都添加了synchronized锁,这样的锁无疑会影响到性能。

一、基本参数与构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/* ---------------- Constants -------------- */

private static final int MAXIMUM_CAPACITY = 1 << 30;

private static final int DEFAULT_CAPACITY = 16;

static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

private static final float LOAD_FACTOR = 0.75f;

static final int TREEIFY_THRESHOLD = 8;

static final int UNTREEIFY_THRESHOLD = 6;

static final int MIN_TREEIFY_CAPACITY = 64;

private static final int MIN_TRANSFER_STRIDE = 16;

private static int RESIZE_STAMP_BITS = 16;

private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
}

最开始的Constants定义了一些类变量,意思基本与HashMap中的一样,就不具体看了。

1、内部类

1.1、Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* ---------------- Nodes -------------- */
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//带有同步锁的value
volatile Node<K,V> next;//带有同步锁的next指针

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允许直接改变value的值
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

Node类是最核心的类,用于存储键值对,和HashMap中的定义也很相似,但是有几点不同:

  1. 对value和next属性设置了volatile同步锁。
  2. 不允许setValue方法直接改变value属性。
  3. 增加find方法来辅助map.get方法。

1.2、TreeNode类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {

}
}

树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap继承自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

1.3、TreeBin类

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

这里仅贴出它的构造方法。可以看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识为。同时也看到我们熟悉的红黑树构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}

1.4、ForwardingNode

一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

2、重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   /* ---------------- Fields -------------- */
//与HashMap中一样,只是加了volatile
transient volatile Node<K,V>[] table;

//一个过渡的table表 只有在扩容的时候才会使用
private transient volatile Node<K,V>[] nextTable;

//base counter,经由CAS来update
private transient volatile long baseCount;

//hash表初始化或扩容时的一个控制位标识量
private transient volatile int sizeCtl;

private transient volatile int transferIndex;

private transient volatile int cellsBusy;

private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

可以看出,基本所有的变量都被设置成volatile属性,

2.1、baseCount

用来记录map中的元素个数,但是返回的时候不会直接返回它,而是一个估计值,下面会详细介绍。

2.2、sizeCtl

sizeCtl是ConcurrentHashMap中出镜率很高的一个属性,是一个控制标识符,在不同的地方有不同的用途,不同的取值代表了不同的含义:

  • 负数代表正在进行初始化或扩容操作
  • -1代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。

3、构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* ---------------- Public operations -------------- */
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

与HashMap一样,前面几个都是指定了属性,并不会初始化数组。

4、三个核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ---------------- Table element access -------------- */


@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

4.1、tabAt

获得在i位置上的Node节点

4.2、casTabAt

利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少

在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改

因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果,后面会详细分析

4.3、setTabAt

利用volatile方法设置节点位置的值

二、Table数组的初始化与扩容

关于table数组初始化与扩容的关键方法

1、初始化方法initTable

对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   
/* ---------------- Table Initialization and Resizing -------------- */

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0表示有其他线程正在进行初始化,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//相当于0.75*n 设置一个扩容的阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

初始化方法主要应用了关键属性sizeCtl 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n

2、扩容方法 transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
/*
* step1 计算每核处理的量stride,如果小于16,则强制赋值16
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range

/*
* step2 如果nextTable为空
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//构造一个nextTable对象 它的容量是原来的两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//复制开始的位置
transferIndex = n;
}


int nextn = nextTab.length;
//构造一个连节点指针 用于标志位,fwd结点的hash值为-1,fwd.nextTable=nextTab
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//并发扩容的关键属性,等于true,说明此节点已经处理过
boolean advance = true;
//循环的关键变量,判断是否已经扩容完成,完成就return,退出循环
boolean finishing = false; // to ensure sweep before committing nextTab

/*
* step3 死循环遍历,直到finishing。
*/
for (int i = 0, bound = 0;;) {// 死循环
Node<K,V> f; int fh;


// 控制--i,遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;

if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}//TRANSFERINDEX 即用CAS计算得到的transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}



/*
* i<0说明已经遍历完旧的数组tab;
* i>=n什么时候有可能呢?在下面看到i=n,所以目前i最大应该是n吧。
* i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
if (finishing) {
nextTable = null;
table = nextTab;
//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//finishing和advance保证线程已经扩容完成了可以退出循环
finishing = advance = true;
i = n; // recheck before commit
}
}

/*
* f = tabAt(tab, i),对f进行判断
*/

//如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);

//那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed


else {
//对f结点上锁,只有本线程处理该结点对于的链表,处理后插入插入ForwardingNode结点表明已经处理过了
synchronized (f) {
if (tabAt(tab, i) == f) {
//两个关键变量,ln原位置节点,hn新位置节点
Node<K,V> ln, hn;

//fh = f.hash,>=0说明为链表结点
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
/*
* 这边还对链表进行遍历,将其拆分为两个链表
* 根据结点的hash&n等于0和不等于0的进行拆分
* 拆分分两个链表分别放在新表中的i和i+n中
*
* 因为n变为2倍,那么n-1的mask范围在高位多1bit(红色),因此链表根据高位为0或者1分为两个子链表
* 高位为0的节点桶位置没有发生变化,高位为1的节点桶位置增加了n
*/
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}

//复制完成后的4步操作
setTabAt(nextTab, i, ln);//在nextTable[i]插入原节点
setTabAt(nextTab, i + n, hn);//在nextTable[i+n]插入新节点
setTabAt(tab, i, fwd);//在table[i]插入forwardNode节点,表示已经处理过该节点
advance = true;//设置advance为true 返回到上面的while循环中 就可以执行--i操作
}
//f为树结点,主体思想和链表差不多
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}

//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;

//复制完成后的4步操作,与上面链表的相同
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

整个扩容操作分为两个大部分

  1. 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。
  2. 二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

具体的流程为:

  1. 为每个内核均分任务,并保证其不小于16

  2. 若nextTab为null,则初始化其为原table的2倍;

  3. 死循环遍历,直到finishing。

    • 计算i的值,从后向前遍历数组完成复制,头结点f = tabAt(tab, i)
    • 节点为空,则插入ForwardingNode,表明该位置已经处理过了
    • 结点为ForwardingNode,说明已经处理过了,直接跳过
    • 否则说明该链表未被处理,则对f结点上锁,处理后插入插入ForwardingNode结点表明已经处理过了
      • f是链表节点(fh>=0),将原链表重新hash拆分为两部分,分别插入nextTable的i和i+n的位置
      • f是TreeBin节点(fh<0),也拆分为两部分,最后要判断如果不再需要tree结构则反转会链表
  • finishing时,nextTab赋给table,更新sizeCtl为新容量的0.75倍 ,完成扩容。

3、协助扩容方法helpTransfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//计算一个操作校验码
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//调用扩容方法,直接进入复制阶段
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。

三、put

1、put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
   static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
public V put(K key, V value) {
return putVal(key, value, false);
}
static final int spread(int h) {
//与HashMap稍有不同
return (h ^ (h >>> 16)) & HASH_BITS;
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {

//不允许key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//死循环,就是不断的尝试。何时插入成功,何时跳出。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据hash值计算出在table里面的位置 i = (n - 1) & hash
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇(fh = f.hash) == MOVED,说明是一个ForwardingNode结点,正在进行扩容,帮助一起扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//待插入位置非空且不是ForwardingNode节点,即碰撞了
else {
V oldVal = null;
//

/**
* 结点上锁,这里的结点可以理解为hash值相同组成的链表的头结点
* 内置锁synchronized锁住了f,因为f是指定特定的tab[i]的,
* 所以就锁住了整行链表,这个设计跟分段锁有异曲同工之妙,只是其他读取操作需要用cas来保证
*/
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh〉0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1
addCount(1L, binCount);
return null;
}

put操作的流程:

  1. 判空: ConcurrentHashMap与HashMap不同的一点就是不允许key或value为null,如果为null则直接抛出异常

  2. hash:计算hash值。h=key.hashcode,调用spread计算hash=(h ^(h >>>16))& HASH_BITS

  3. 遍历table

    • 若table为null,则调用initTable初始化table数组

    • 根据hash值计算存放位置,即table的下标i=(n - 1) & hash

    • 对该位置的结点f = tabAt(tab, i = (n - 1) & hash))进行判断:

      • 如果是null, 调用casTabAt方法无锁插入

      • 如果是ForwardingNode结点,说明正在扩容,调用helpTransfer方法帮助一起复制

      • 剩下的情况就是非空且不是ForwardingNode节点,即碰撞了,将头节点上锁(保证了线程安全)

        区分链表节点和树节点,分别插入

  4. addCount

扩容与协助扩容方法上面已经分析过,下面看一下addCount方法。

2、addCount方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
* baseCount值加上x
* check<0,则不需要检测是否需要扩容
*/

private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次进来都baseCount都加1因为x=1
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//多线程CAS发生失败的时候执行
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);

//如果小于0说明已经有线程在进行扩容操作了
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;


if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

可以看出该方法大体分两步:

  1. 用CAS方法给baseCount加上指定的x,如果失败了,就执行fullAddCount方法。
  2. 如果指定的check小于0,你们在baseCount加上x后就要进行扩容检测,如果需要扩容,则调用transfer方法进行扩容。

四、其他方法

1、size与mappingCount方法

对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。

2.1、size方法

1
2
3
4
5
6
7
8
public final int size()        { return map.size(); }

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

旧版的方法。可以看出,size方法并没有直接返回,而是调用了sumCount方法。

2.1、mappingCount

mappingCount与size方法的类似 从下面的注释可以看出,应该使用mappingCount代替size方法,并且返回值至少一个估计值。如果sumCount时有线程插入或删除,实际数量是和mappingCount不同的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

与size方法一样,都没有直接返回basecont值,而是调用了下面的sumCount方法

2.3、sumCount方法

接下来就看一下这个sumCount方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   @sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

public boolean isEmpty() {
return sumCount() <= 0L; // ignore transient negative values
}

2、get与contains方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

public boolean containsKey(Object key) {
return get(key) != null;
}

public boolean containsValue(Object value) {
if (value == null)
throw new NullPointerException();
Node<K,V>[] t;
if ((t = table) != null) {
Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
for (Node<K,V> p; (p = it.advance()) != null; ) {
V v;
if ((v = p.val) == value || (v != null && value.equals(v)))
return true;
}
}
return false;
}

五、参考地址

http://blog.csdn.net/u010723709/article/details/48007881

https://yq.aliyun.com/articles/36781

http://www.cnblogs.com/huaizuo/archive/2016/04/20/5413069.html