ConcurrentHashMap1.7

如果对于hashmap中添加元素,如果加锁,会导致如果2个元素放的不是同一个位置上也会进行阻塞。

存储结构

image-20200809114648367
1
2
3
ConcurrentHashMap 
Segment[] table;
每一个中table包含多个个HashEntry[] tab
1
2
3
4
5
6
static final class HashEntry<K,V> { //与hashmap相似
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}

ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。Segment 继承自 ReentrantLock。

构造方法

与HashMap同理,只是多了一个并发级别

1
2
3
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

与hashmap参数相同,只是多了一些关于并发的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//默认容量大大小
static final int DEFAULT_INITIAL_CAPACITY = 16;
//并发级别,也就是说默认创建 16 个 Segment。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//每个块最小的值
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

static final int MAX_SEGMENTS = 1 << 16;

static final int RETRIES_BEFORE_LOCK = 2;

private transient final int hashSeed = randomHashSeed(this);

final int segmentMask;

/**
* Shift value for indexing within segments.
*/
final int segmentShift;

/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//限制
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1; //segment数组大小
//循环移位直到ssize大于concurrencyLevel 2的n次方
while (ssize < concurrencyLevel) {
++sshift; //这个是2的n次冥,就是左移次数
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;//长度
//判断是否越界
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;

int c = initialCapacity / ssize; //获得分块的数量
//向上取整 除法没有小数位,假如 ssize为16,initialCapacity为17, c为1,需要对c进行+1
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; //获得分块最小值 为2
while (cap < c) //如果c比最小的小,直接使用cap作为数组大小。这里和hashmap相同都是需要2的次方
cap <<= 1;
// create segments and segments[0]
//加载因子,阈值,数组, 每次扩容都只是扩Segment 2 -> 4,并不是对最外面的进行扩容
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//创建一个固定大小的Segment,不会发生改变
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//放入 ,方便未来新插入数值时,这个segment为null,方便取到 加载因子和阈值,相当于原型模式
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
1
2
3
4
5
6
7
创建的步骤:
1.判断越界
2.初始化ssize 这个值要比concurrencyLevel大 并且是2的幂次方
3.计算每个Segment里面各自的容量:为传入容量/ssize ,并且判断是否这个值比最小的容量小,小就为最小容量。
4.创建一个Segment,其中的hashEntry大小为刚刚计算的大小。
5.创建Segment数组,这数组的长度为ssize,扩容时不改变。
6.将刚创建的一个Segment放入,方便未来新插入数值时,这个segment为null,方便取到 加载因子和阈值,相当于原型模式

Segment对象 可重入

每个segment相当于hashmap

1
static final class Segment<K,V> extends ReentrantLock implements Serializable

每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。

1
transient int count;

Put操作

key不能为null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
//创建一个新的Segment
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//计算hash,key不能为null
int hash = hash(key);
//与hashmap类似,移位与操作获得位置,只去最高几位进行计算。
int j = (hash >>> segmentShift) & segmentMask;
//这个segment第j个位置是否为null,不为null 这里使用了CAS
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
//这里就是去数组中第j个位置元素
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//这个位置为null,生成一个新的
s = ensureSegment(j);
//然后调用这Segment的put操作
return s.put(key, hash, value, false);
}

这里hash使用高位进行计算,在每一个Segment中我们使用低位。

计算hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int hash(Object k) {
int h = hashSeed;

if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}

h ^= k.hashCode();//key不能为null

// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

静态方法块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("hashSeed"));
SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentShift"));
SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentMask"));
SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segments"));
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
1
2
3
//这个方法会返回最高位的1前面的个数,例如
Integer.numberOfLeadingZeros(1); //31
//因为这个数组是2的幂次方

生成Segment

这里会出现并发冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
//取到这个key能放位置
long u = (k << SSHIFT) + SBASE; // raw offset
//生成的对象
Segment<K,V> seg;
//这里获得CAS,判断这里的值是否一样
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//原型模式
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length; //获得长度
float lf = proto.loadFactor; //加载因子
int threshold = (int)(cap * lf);//阈值
//创建
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//判断这个位置还不是吧空,双重检查
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
//创建这个对象
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//自旋锁,直到把这个值放到这个位置,这里与原子类相同,保证只有一个线程能对这个null赋值
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//获取到就对对象进行更新,如果为false说明有另外的线程对这个u位置进行了更新
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

这里多次判断是为了提高效率

Segment put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//获取锁,这里继承了可重入锁,
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);//这个方法的目的也是获取锁
V oldValue;
try {
//获取table
HashEntry<K,V>[] tab = table;
//计算hash
int index = (tab.length - 1) & hash;
//获取第一个值
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//这里和hashmap类似,判断key和hash是否相同,就是遍历,查看是否存在已有的值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//标记是否为true ,是就连值都不需要改变
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else { //这个位置没有元素
if (node != null) //色哥结点在等待的时候已经创建好了,就直接插入
node.setNext(first);
else
//创建一个新的值,使用头插法。
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;//count表示这个Segment存了多少元素
//达到阈值,进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else //将新生成的节点设置到node
setEntryAt(tab, index, node);
++modCount; //fast-fail
count = c; //元素了加1
oldValue = null; //旧元素为空
break;
}
}
} finally {
unlock();
}
return oldValue;
}
1
2
3
4
5
6
7
8
基本流程:
1.获取锁,非阻塞
2.计算出要放的位置,获得这个位置的第一个元素
3.循环判断这位置是否为null
3.1不为null,循环判断是否已经存在key相同的值,存在退出返回旧值,直到为null,进入3.2
3.2 null

创建一个新的节点,存储个数+1,查看是否需要进行扩容。将元素放入,打破循环。

获取这个方法也是获取内存的值

1
2
3
4
5
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}

放入操作也是用的UNSAFE类,保证修改的是内存中的值,而不是当前线程的值

1
2
3
4
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

这个方法是如果存在这个key就直接不放入,也是传入标记为true的地方

1
2
3
4
5
6
7
8
9
10
11
public V putIfAbsent(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, true);
}

加锁的机制

1
2
3
4
5
6
7
8
9
10
11
tryLock() //判断这把锁我能不能获取到,能就返回true,不能就返回false

//这个方法会消耗cpu资源
while(!lock.trylock()){
//做其他事情
}
//线程获得到了锁
lock.unlock();

lock() //这把锁获取到时就会去加锁,获取不到就会阻塞
这个方法不怎么消耗

获取不到锁的时候就会执行这个方法,在获取锁的时候做其他事情 。这里做的事情就是创建一个HashEntry对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//获取hash值下标,不确定是否为null
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
//循环次数,当到达一定次数时,直接调用lock()阻塞方法
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//每次循环会进一个分支,每次遍历都是会判断有没有获取锁
if (retries < 0) {
//遍历链表
if (e == null) { //遍历完
//假如头结点改变,这个值也就不需要new了
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
//有key相等的,就不用new,
else if (key.equals(e.key))
retries = 0;
else //下一个
e = e.next;
}
//先加再比较,达到一定次数直接调用lock(),退出,这个值取cpu的核心数
// cpu核心>1 取64
// =1 取1
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//在偶数次重试的中间,判断这个位置的值还是否是first,就是判断其他线程有没有放入这个位置
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//发生改变重新赋值,并重新遍历链表
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

扩容

这里的扩容只是针对每一个Segment内部的数组进行扩容,而不是对最外面的数组进行扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1; ///扩容
threshold = (int)(newCapacity * loadFactor);//算阈值
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];//新数组
int sizeMask = newCapacity - 1; //放去余
for (int i = 0; i < oldCapacity ; i++) { //遍历原数组
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask; //新数组的下标
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//这里是把后面所有放入同一个节点的数找出
for (HashEntry<K,V> last = next;
last != null;
last = last.next) { //从下一个节点开始遍历
int k = last.hash & sizeMask; //计算这个节点的下标
if (k != lastIdx) { //查看这个下标是否相同
lastIdx = k; //不相同就重新赋值
lastRun = last; //把这个节点也记录
}
}
//优化了最后几个节点的插入
//这个节点之后的数全部放入同一个位置
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//从头节点开始
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
1
2
官方解释
将每个列表中的节点重新分类为新表。因为我们使用的是2的幂展开,所以每个bin中的元素必须要么保持在相同的索引上,要么以两个幂的偏移量移动。我们通过捕捉由于下一个字段不更改而可以重用旧节点的情况来消除不必要的节点。统计上,在默认阈值下,当表翻倍时,只有大约六分之一需要克隆。它们替换的节点一旦不再被并发遍历表中的任何读线程引用,就会变成垃圾可选的节点。入口访问使用明文数组索引,因为它们后面跟着可挥发的写操作。
1
2
3
4
5
6
7
我的理解:
1.传入一个新的node节点
2.根据旧表的长度,扩容2倍,生成一个新的表
3.遍历旧的数组,看这个节点是否位null,不为null,就遍历这个链表
4.当遍历到链表最后几个节点要插入的位置相同时,进行记录。
5.将刚才遍历得到的最后几个直接插入到表中,再遍历链表,循环插入。
6.在将这个节点插入表中

size

在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。

ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。

尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。

如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//对每一个Segment加锁,第一次循环不会进入
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//统计每一个Segment的长度,size表示这一次循环内的个数
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

ConcurrentHashMap1.8

不同

  1. 没有1.7中的Segment数组,也就是没有分段锁了,使用synchronize来控制

  2. JDK8中的扩容性能更高,支持多线程扩容,JDK7中也支持,因为7中是扩容单个Segment,但是性能没有JDK8的高,因为在JDK8中任意线程都可以帮助扩容

  3. JDK8中元素个数的统计也不一样了,JDK8中增加了CounterCell来帮助计数,

这里使用treeBin存储红黑树

扩容 1.7对应的是每个Segment对象进行扩容

TreeBin

1
2
3
4
5
6
7
8
9
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

TreeBin是记录红黑树的类,不在像hashmap中一样只使用节点,因为在多线程进行操作的时候,红黑树的根节点会发生变化,这里就需要这个类来记录这颗红黑树的属性。这里是对这个表节点对象进行加锁,如果这个对象已经不存在了,其他线程就可以得到这个位置的锁。而利用这个类就可以得到这个节点所有的信息,其他线程是无法获取到的。

构造方法

参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED = -1; // 表示正在转移
static final int TREEBIN = -2; // 表示已经转换成树
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
/**
* 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
* 当为负的时候,说明表正在初始化或扩张,
* -1表示初始化
* -(1+n) n:表示活动的扩张线程
*/
private transient volatile int sizeCtl;

无参构造方法,这里与1.7不同

1
2
public ConcurrentHashMap() {
}

put操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //这里每次获取新的数组,万一有的线程完成了转移
Node<K,V> f; int n, i, fh;
//初始化数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//这里用到UnSafe操作,直接从内存中获取值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//通过CAS的方式从内存中设置值,如果失败,就继续循环判断值是否已经改变
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//这f已经有值了 node对象,有线程对这个tab对这个表进行扩容,帮助他进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { //这里对node节点加锁,这里没有在用分段锁
//判断f有没有被其他线程修改,还是不是头节点
if (tabAt(tab, i) == f) {
if (fh >= 0) { //链表上面的一个节点
binCount = 1;
//这里就是循环链表,看有没有相同key的,并且统计链表个数
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//尾插法
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//树上面的一个节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//把这就节点放入红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果这个链表大于这个阈值,就把它转化成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//对size加1
addCount(1L, binCount);
return null;
}

初始化表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//这里使用CAS堆sc进行修改,为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//初始化
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//这里是取0.75
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
1
2
3
4
5
private transient volatile int sizeCtl; 
volatile保证内存的可见性,保证得到的值是最新的
开始sizeCtl = 0 ,对数组初始化时sizeCtl变成-1 ,最后sizeCtl = 阈值
在sizeCtl是-1时,一个线程对其进行扩容
另一个线程 Thread.yield(),让出cpu资源,在进行竞争,避免线程一直while

这3个操作是利用CAS的方式保证数据的一致行性

1
2
3
4
5
6
7
8
9
10
11
12
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

红黑树化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //判断最小转成红黑树的值 默认64
tryPresize(n << 1);
//如果b的值不为null,并且hash大于0
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { //对数组这个位置加锁
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//修改成双向链表
for (Node<K,V> e = b; e != null; e = e.next) {
//创建树节点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//把这颗红黑树设置到这个数组index的位置,这个TreeBin就是红黑树的创建过程
//这里红黑树的创建过程
setTabAt(tab, index, new TreeBin<K,V>(hd))
}
}
}
}
}

元素个数加减

先看看这个如何获取到这个map中总的个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
//统计每一个cell的值,最终返回
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

元素个数加和扩容

当多个线程竞争baseCount,并发冲突大,这里缓解了压力。使用了数组分块加,并且判断要不要进行扩容。

扩容:这里是根据步长进行元素转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//x:加的个数
//check:是否需要扩容 在remove的情况下就为-1
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//如果这个数组是空的直接进入 或 数组不为null再利用CAS修改baseCount 为b + x的长度失败,会进入
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果这个数组为空,或长度小于0
if (as == null || (m = as.length - 1) < 0 ||
//每个线程生成随机数&数组的长度 生成在数组的位置,如果这个位置不是空的进入下一个判断
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
//用CAS操作数组中的数字进行加x ,如果失败,就进入方法体
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//cell数组的初始化,或者直接+1 //uncontended用于判断CAS有没有失败
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//计算总的值
s = sumCount();
}
//这里是扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//判断当前的个数是不是大于这个阈值 && 数组是不是不为null && 数组的容量小于最大值
//
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { //小于0说明已经在扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//一个线程获得了,成功的扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
1
2
3
4
5
6
这里用到了LongAdder中的思路

private transient volatile long baseCount;//个数

CounterCell计数格子
每个线程算出一个随机值& cell的长度 算出在在cell的位置,如何对这个位置的数进行加

添加计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//要加减的数量,   有没有CAS失败
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//拿到线程hash值
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//判断数组是否为null
if ((as = counterCells) != null && (n = as.length) > 0) {
//不为null,判断 这个位置是不是null
if ((a = as[(n - 1) & h]) == null) {
//是null,如果这个数组不忙
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create创建一个
//如果这个数组还不忙,就用CAS的方式设置为忙
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;//标准位
try { // Recheck under lock
CounterCell[] rs; int m, j;
//在加完锁的情况下再检查一遍
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;//放入
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//如果进方法之前CAS过冲突,重新计算hash,重新走一遍
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//没有发生CAS冲突 ,对这个cell进行+1,如果修改成功,就跳出循环,结束
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//如果这个线程判断这里的cell已经被修改就把collide置位false重新循环,并且现在数组的大小
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//目的未来重新hash,如何重新尝试能否CAS成功
else if (!collide)
collide = true;
//如果重新hash还是失败 ,在此判断这个数组是不是忙,不忙就进行修改成忙
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//达到这里说明这个数组的长度不足以对抗这个并发度,就对这个数组进行扩容,每次扩容1倍
//然后转移原数组
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//生成一个新的hash值
h = ThreadLocalRandom.advanceProbe(h);
}
//这个数组为null 判断这个数组是不是忙 ,然后CAS设置这个数组为忙
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) { //如果这个数组还是没有改变,创建大小为2的数组进行复制
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {//忙
cellsBusy = 0;
}
if (init)//初始化完成直接退出
break;
}
//这个数组在忙,就一直对着basecount进行改变,看能不能成功
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
这段代码的说明:
操作数组必须判断这个数组是否是忙碌状态
1.拿到这个线程的hash值 collide=false
2.循环
判断这个数组是否为null
不为null 判断这个要加的数组位置有没有发出冲突
没有发生冲突,就判断数组是否有别的线程进行加,没有就把这个数组设置为忙碌,进行修改,跳出循环
位置发生冲突,
判断wasUncontended是不是false CAS有没有冲突,如果有,重新走一次循环把标准,并且置为true
CAS尝试修改这个cell成功就跳出,不成功,就继续走一次循环

到这里如果这个线程判断这里的cell已经被扩容就把collide置位false重新循环,或者数组已经到达最大值, 不会进行扩容

上面的操作都不成功,判断collide是否为false ,修改collide为true在去循环
如果重新hash都失败的情况下 判断数组是否是忙碌状态,不是就对数组进行扩容,并把collide置位false再次进行循环。

这个操作每次都会改变hash值

null 判断这个数组是否在忙碌状态,也就是有没有其他线程在创建这个数组,
不忙碌,就用CAS加锁创建数组,把要改的值放入
忙碌,尝试一次CAS修改baseCount,修改成功退出循环,不成功,继续循环

3.

扩容

帮助扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//sc+1表示+1个线程进行库容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//每个线程转移的步长最小的是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating创建新数组
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //创建
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; //默认等于数组大小
}
int nextn = nextTab.length;
//创建一个对象,用于转移完后放置在旧数组的位置
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; //线程转移完步长,是否需要转移其他的
boolean finishing = false; // to ensure sweep before committing nextTab 当前线程的扩容是否完成
for (int i = 0, bound = 0;;) { //i和bound表示转移的长度
Node<K,V> f; int fh;
while (advance) { //控制区域
int nextIndex, nextBound;
if (--i >= bound || finishing)//判断是否换在转移过程 --i表示下一个数组位置 或 是否完成
advance = false;
else if ((nextIndex = transferIndex) <= 0) { //每次循环重新取值
i = -1;
advance = false;
}
//判断步长和数组大小谁大,将2个的差值赋给nextbound,然后修改内存中的transferIndex为nextbound,这里有2个用处,一是帮一个线程找到转移元素位置,如果这个线程已经完成,再继续从这里取一段进行扩容
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//相当于 数组长64 nextbound = 48 i = 63 找出线程要扩容的数组,退出循环
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//这里判断现在要转移的元素已经全部转移完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//这里在所有工作都完成的情况下,由最后一个线程进行把新数组赋值
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//这里是帮助线程,别的线程帮助完成 每多一个线程 sc+1
//这里减少
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//这里如果sc不是初始值的话,说明这个线程不是最后一个转移的线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true; //所有的线程都扩容完毕,就赋值为true
i = n; // recheck before commit 从新检查
}
}
//如果数组这个位置为null ,直接在这个位置放一个fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //如果这个位置已经是fwd节点,就跳出这个循环
advance = true; // already processed
else {
//如果这里有节点加锁转移
synchronized (f) { //对这个位置加锁,不能put
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {//是一个链表
//这里与1.7concurremt的相似,就是找出链表最后面放入同一个位置的链表
//也与1.8的hashmap类似
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//把最后的链表放入一个位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//然后形成高位和低位的链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//把链表放入,设置fwd
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//红黑树,与1.8的红黑树转移相同
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//找到这2条链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//判断红黑树需不需要改成链表,不需要的情况下判断另一个链表是不是为空,如果为空,表示树没有发生改变,否则从新生成红黑树。
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
判断新数组是不是null,是就创建一个新的数组
循环
while advance有没有找到自己要转移的位置,这里找位置是从右向左进行
判断i--是不是还在线程要转移的里面 或者以及完成转移,就跳出这个循环
取到transferIndex,还没有转移的数组位置,如果已经没有,就跳出循环
每个线程用CAS找到自己的i和bound 自己要转移数组的位置,或者以及完成自己线程步长的转移,再找到下一个元素的转移位置

这里判断现在要转移的元素已经全部转移完成,所有线程退出,直到最后一个线程的工作是把新数组赋值给旧数组。

根据找到位置判断这里是不是null,是就置为fwd
如果这里是fwd,就进行下次循环
如果这里不为null,就说明有元素,判断类型
链表的转移方式 找尾链,如何分高低链进行转移 与1.7的concurrent和1.8的hashmap类似
红黑树的转移方式 与1.8的hashmap类似

常见面试题

JDK7中的ConcurrentHashMap是怎么保证并发安全的?

主要利用Unsafe操作+ ReentrantLock+分段思想。
主要使用了Unsafe操作中的:

  1. compareAndSwapObject:通过cas的方式修改对象的属性

  2. putOrderedObject:并发安全的给数组的某个位置赋值

  3. getObjectVolatile: 并发安全的获取数组某个位置的元素

分段思想是为了提高ConcurrentHashMap的并发量,分段数越高则支持的最大并发量越高,程序员可以通过concurrencyLevel参数来指定并发量。ConcurrentHashMap的内部类 Segment就是用来表示某- -个段的。

每个Segment就是一个小型的HashMap的,当调用ConcurrentHashMap的put方法是,最终会调用到Segment的put方法,而Segment类 继承了ReentrantLock,所以Segment自带可重入锁,当调用到Segment的put方法时,会先利用可重入锁加锁,加锁成功后再将待插入的key,value插入到小型HashMap中,插入完成后解锁。

JDK8中的ConcurrentHashMap是怎么保证并发安全的?

主要利用Unsafe操作+synchronized关键字。

  • Unsafe操作的使用仍然和JDK7中的类似,主要负责并发安全的修改对象的属性或数组某个位置的值。
  • synchronized主要负责在需要操作某个位置时进行加锁(该位置不为空),比如向某个位置的链表进行插入结点,向某个位置的红黑树插入结点。

JDK8中其实仍然有分段锁的思想,只不过JDK7中段数是可以控制的,而JDK8中是数组的每一个位置 都有一把锁。

当向ConcurrentHashMap中put-个key,value时 ,

  1. 首先根据key计算对应的数组下标i,如果该位置没有元素,则通过自旋的方法去向该位置赋值。

  2. 如果该位置有元素,则sInchronized会 加锁

  3. 加锁成功之后,在判断该元素的类型

    a.如果是链表节点则进行添加节点到链表中
    b.如果是红黑树则添加节点到红黑树
  4. .添加成功后,判断是否需要进行树化

  5. addCount,这个方法的意思是ConcurrentHashMap的元素个数加1,但是这个操作也是需要并发安全的,并且元素个数加1成功后,会继续判断是否要进行扩容,如果需要,则会进行扩容,所以这个方法很重要。

  6. 同时一个线程在put时如果发现当前ConcurrentHashMap正在进行扩容则会去帮助扩容。

JDK7和JDK8中的ConcurrentHashMap的不同点

这两个的不同点太多了…既包括了HashMap中的不同点,也有其他不同点,比如:

  1. JDK8中没有分段锁了,而是使用synchronized来 进行控制
  2. JDK8中的扩容性能更高,支持多线程同时扩容,实际上JDK7中也支持多线程扩容,因为JDK7中的扩 容是针对每个Segment的,所以也可能多线程扩容,但是性能没有JDK8高,因为JDK8中对于任意-个线程都可以去帮助扩容
  3. JDK8中的元素个数统计的实现也不一样了,JDK8中增加了CounterCell来帮助计数,而JDK7中没有, JDK7中是put的时候每个Segment内部计数,统计的时候是遍历每个Segment对象加锁统计(当然有-点点小小的优化措施,看视频吧..。