ConcurrentHashMap1.7 如果对于hashmap中添加元素,如果加锁,会导致如果2个元素放的不是同一个位置上也会进行阻塞。
存储结构
1 2 3 ConcurrentHashMap Segment[] table; 每一个中table包含多个个HashEntry[] tab
1 2 3 4 5 6 static final class HashEntry <K ,V > { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。Segment 继承自 ReentrantLock。
构造方法 与HashMap同理,只是多了一个并发级别
1 2 3 public ConcurrentHashMap () { this (DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
与hashmap参数相同,只是多了一些关于并发的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static final int DEFAULT_INITIAL_CAPACITY = 16 ;static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;static final int MIN_SEGMENT_TABLE_CAPACITY = 2 ;static final int MAX_SEGMENTS = 1 << 16 ; static final int RETRIES_BEFORE_LOCK = 2 ;private transient final int hashSeed = randomHashSeed(this );final int segmentMask;final int segmentShift;final Segment<K,V>[] segments;transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;transient Collection<V> values;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this .segments = ss; }
1 2 3 4 5 6 7 创建的步骤: 1.判断越界 2.初始化ssize 这个值要比concurrencyLevel大 并且是2的幂次方 3.计算每个Segment里面各自的容量:为传入容量/ssize ,并且判断是否这个值比最小的容量小,小就为最小容量。 4.创建一个Segment,其中的hashEntry大小为刚刚计算的大小。 5.创建Segment数组,这数组的长度为ssize,扩容时不改变。 6.将刚创建的一个Segment放入,方便未来新插入数值时,这个segment为null,方便取到 加载因子和阈值,相当于原型模式
Segment对象 可重入 每个segment相当于hashmap
1 static final class Segment <K ,V > extends ReentrantLock implements Serializable
每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。
Put操作 key不能为null
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
这里hash使用高位进行计算,在每一个Segment中我们使用低位。
计算hash 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private int hash (Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h += (h << 15 ) ^ 0xffffcd7d ; h ^= (h >>> 10 ); h += (h << 3 ); h ^= (h >>> 6 ); h += (h << 2 ) + (h << 14 ); return h ^ (h >>> 16 ); }
静态方法块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed" )); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift" )); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask" )); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments" )); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1 )) != 0 || (ts & (ts-1 )) != 0 ) throw new Error("data type scale not a power of two" ); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); }
1 2 3 Integer.numberOfLeadingZeros(1 );
生成Segment 这里会出现并发冲突
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private Segment<K,V> ensureSegment (int k) { final Segment<K,V>[] ss = this .segments; long u = (k << SSHIFT) + SBASE; Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> proto = ss[0 ]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int )(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { if (UNSAFE.compareAndSwapObject(ss, u, null , seg = s)) break ; } } } return seg; }
这里多次判断是为了提高效率
Segment put方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
1 2 3 4 5 6 7 8 基本流程: 1.获取锁,非阻塞 2.计算出要放的位置,获得这个位置的第一个元素 3.循环判断这位置是否为null 3.1不为null,循环判断是否已经存在key相同的值,存在退出返回旧值,直到为null,进入3.2 3.2 null 创建一个新的节点,存储个数+1,查看是否需要进行扩容。将元素放入,打破循环。
获取这个方法也是获取内存的值
1 2 3 4 5 static final <K,V> HashEntry<K,V> entryAt (HashEntry<K,V>[] tab, int i) { return (tab == null ) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )i << TSHIFT) + TBASE); }
放入操作也是用的UNSAFE类,保证修改的是内存中的值,而不是当前线程的值
1 2 3 4 static final <K,V> void setEntryAt (HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long )i << TSHIFT) + TBASE, e); }
这个方法是如果存在这个key就直接不放入,也是传入标记为true的地方
1 2 3 4 5 6 7 8 9 10 11 public V putIfAbsent (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, true ); }
加锁的机制
1 2 3 4 5 6 7 8 9 10 11 tryLock() while (!lock.trylock()){ } lock.unlock(); lock() 这个方法不怎么消耗
获取不到锁的时候就会执行这个方法,在获取锁的时候做其他事情 。 这里做的事情就是创建一个HashEntry对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0 ) { if (e == null ) { if (node == null ) node = new HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else if (key.equals(e.key)) retries = 0 ; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else if ((retries & 1 ) == 0 && (f = entryForHash(this , hash)) != first) { e = first = f; retries = -1 ; } } return node; }
扩容 这里的扩容只是针对每一个Segment内部的数组进行扩容,而不是对最外面的数组进行扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private void rehash (HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
1 2 官方解释 将每个列表中的节点重新分类为新表。因为我们使用的是2 的幂展开,所以每个bin中的元素必须要么保持在相同的索引上,要么以两个幂的偏移量移动。我们通过捕捉由于下一个字段不更改而可以重用旧节点的情况来消除不必要的节点。统计上,在默认阈值下,当表翻倍时,只有大约六分之一需要克隆。它们替换的节点一旦不再被并发遍历表中的任何读线程引用,就会变成垃圾可选的节点。入口访问使用明文数组索引,因为它们后面跟着可挥发的写操作。
1 2 3 4 5 6 7 我的理解: 1.传入一个新的node节点 2.根据旧表的长度,扩容2倍,生成一个新的表 3.遍历旧的数组,看这个节点是否位null,不为null,就遍历这个链表 4.当遍历到链表最后几个节点要插入的位置相同时,进行记录。 5.将刚才遍历得到的最后几个直接插入到表中,再遍历链表,循环插入。 6.在将这个节点插入表中
size 在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。
ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。
尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。
如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public int size () { final Segment<K,V>[] segments = this .segments; int size; boolean overflow; long sum; long last = 0L ; int retries = -1 ; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
ConcurrentHashMap1.8 不同
没有1.7中的Segment数组,也就是没有分段锁了,使用synchronize来控制
JDK8中的扩容性能更高,支持多线程扩容,JDK7中也支持,因为7中是扩容单个Segment,但是性能没有JDK8的高,因为在JDK8中任意线程都可以帮助扩容
JDK8中元素个数的统计也不一样了,JDK8中增加了CounterCell来帮助计数,
这里使用treeBin存储红黑树
扩容 1.7对应的是每个Segment对象进行扩容
TreeBin 1 2 3 4 5 6 7 8 9 static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock
TreeBin是记录红黑树的类,不在像hashmap中一样只使用节点,因为在多线程进行操作的时候,红黑树的根节点会发生变化,这里就需要这个类来记录这颗红黑树的属性。这里是对这个表节点对象进行加锁,如果这个对象已经不存在了,其他线程就可以得到这个位置的锁。而利用这个类就可以得到这个节点所有的信息,其他线程是无法获取到的。
构造方法 参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static final int MAXIMUM_CAPACITY = 1 << 30 ;private static final int DEFAULT_CAPACITY = 16 ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int HASH_BITS = 0x7fffffff ; transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable; private transient volatile int sizeCtl;
无参构造方法,这里与1.7不同
1 2 public ConcurrentHashMap () {}
put操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
初始化表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
1 2 3 4 5 private transient volatile int sizeCtl; volatile 保证内存的可见性,保证得到的值是最新的开始sizeCtl = 0 ,对数组初始化时sizeCtl变成-1 ,最后sizeCtl = 阈值 在sizeCtl是-1 时,一个线程对其进行扩容 另一个线程 Thread.yield(),让出cpu资源,在进行竞争,避免线程一直while
这3个操作是利用CAS的方式保证数据的一致行性
1 2 3 4 5 6 7 8 9 10 11 12 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long )i << ASHIFT) + ABASE, v); }
红黑树化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private final void treeifyBin (Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null ) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1 ); else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null , tl = null ; for (Node<K,V> e = b; e != null ; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null , null ); if ((p.prev = tl) == null ) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)) } } } } }
元素个数加减 先看看这个如何获取到这个map中总的个数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
元素个数加和扩容 当多个线程竞争baseCount,并发冲突大,这里缓解了压力。使用了数组分块加,并且判断要不要进行扩容。
扩容:这里是根据步长进行元素转移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
1 2 3 4 5 6 这里用到了LongAdder中的思路 private transient volatile long baseCount;CounterCell计数格子 每个线程算出一个随机值& cell的长度 算出在在cell的位置,如何对这个位置的数进行加
添加计数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { CounterCell r = new CounterCell(x); if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; else if (counterCells != as || n >= NCPU) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { CounterCell[] rs = new CounterCell[n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell[2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 这段代码的说明: 操作数组必须判断这个数组是否是忙碌状态 1.拿到这个线程的hash值 collide=false 2.循环 判断这个数组是否为null 不为null 判断这个要加的数组位置有没有发出冲突 没有发生冲突,就判断数组是否有别的线程进行加,没有就把这个数组设置为忙碌,进行修改,跳出循环 位置发生冲突, 判断wasUncontended是不是false CAS有没有冲突,如果有,重新走一次循环把标准,并且置为true CAS尝试修改这个cell成功就跳出,不成功,就继续走一次循环 到这里如果这个线程判断这里的cell已经被扩容就把collide置位false重新循环,或者数组已经到达最大值, 不会进行扩容 上面的操作都不成功,判断collide是否为false ,修改collide为true在去循环 如果重新hash都失败的情况下 判断数组是否是忙碌状态,不是就对数组进行扩容,并把collide置位false再次进行循环。 这个操作每次都会改变hash值 null 判断这个数组是否在忙碌状态,也就是有没有其他线程在创建这个数组, 不忙碌,就用CAS加锁创建数组,把要改的值放入 忙碌,尝试一次CAS修改baseCount,修改成功退出循环,不成功,继续循环 3.
扩容 帮助扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 判断新数组是不是null,是就创建一个新的数组 循环 while advance有没有找到自己要转移的位置,这里找位置是从右向左进行 判断i--是不是还在线程要转移的里面 或者以及完成转移,就跳出这个循环 取到transferIndex,还没有转移的数组位置,如果已经没有,就跳出循环 每个线程用CAS找到自己的i和bound 自己要转移数组的位置,或者以及完成自己线程步长的转移,再找到下一个元素的转移位置 这里判断现在要转移的元素已经全部转移完成,所有线程退出,直到最后一个线程的工作是把新数组赋值给旧数组。 根据找到位置判断这里是不是null,是就置为fwd 如果这里是fwd,就进行下次循环 如果这里不为null,就说明有元素,判断类型 链表的转移方式 找尾链,如何分高低链进行转移 与1.7的concurrent和1.8的hashmap类似 红黑树的转移方式 与1.8的hashmap类似
常见面试题 JDK7中的ConcurrentHashMap是怎么保证并发安全的? 主要利用Unsafe操作+ ReentrantLock+分段思想。 主要使用了Unsafe操作中的:
compareAndSwapObject:通过cas的方式修改对象的属性
putOrderedObject:并发安全的给数组的某个位置赋值
getObjectVolatile: 并发安全的获取数组某个位置的元素
分段思想是为了提高ConcurrentHashMap的并发量,分段数越高则支持的最大并发量越高,程序员可以通过concurrencyLevel参数来指定并发量。ConcurrentHashMap的内部类 Segment就是用来表示某- -个段的。
每个Segment就是一个小型的HashMap的,当调用ConcurrentHashMap的put方法是,最终会调用到Segment的put方法,而Segment类 继承了ReentrantLock,所以Segment自带可重入锁,当调用到Segment的put方法时,会先利用可重入锁加锁,加锁成功后再将待插入的key,value插入到小型HashMap中,插入完成后解锁。
JDK8中的ConcurrentHashMap是怎么保证并发安全的? 主要利用Unsafe操作+synchronized关键字。
Unsafe操作的使用仍然和JDK7中的类似,主要负责并发安全的修改对象的属性或数组某个位置的值。
synchronized主要负责在需要操作某个位置时进行加锁(该位置不为空),比如向某个位置的链表进行插入结点,向某个位置的红黑树插入结点。
JDK8中其实仍然有分段锁的思想,只不过JDK7中段数是可以控制的,而JDK8中是数组的每一个位置 都有一把锁。
当向ConcurrentHashMap中put-个key,value时 ,
首先根据key计算对应的数组下标i,如果该位置没有元素,则通过自旋的方法去向该位置赋值。
如果该位置有元素,则sInchronized会 加锁
加锁成功之后,在判断该元素的类型
a.如果是链表节点则进行添加节点到链表中
b.如果是红黑树则添加节点到红黑树
.添加成功后,判断是否需要进行树化
addCount,这个方法的意思是ConcurrentHashMap的元素个数加1,但是这个操作也是需要并发安全的,并且元素个数加1成功后,会继续判断是否要进行扩容,如果需要,则会进行扩容,所以这个方法很重要。
同时一个线程在put时如果发现当前ConcurrentHashMap正在进行扩容则会去帮助扩容。
JDK7和JDK8中的ConcurrentHashMap的不同点 这两个的不同点太多了…既包括了HashMap中的不同点,也有其他不同点,比如:
JDK8中没有分段锁了,而是使用synchronized来 进行控制
JDK8中的扩容性能更高,支持多线程同时扩容,实际上JDK7中也支持多线程扩容,因为JDK7中的扩 容是针对每个Segment的,所以也可能多线程扩容,但是性能没有JDK8高,因为JDK8中对于任意-个线程都可以去帮助扩容
JDK8中的元素个数统计的实现也不一样了,JDK8中增加了CounterCell来帮助计数,而JDK7中没有, JDK7中是put的时候每个Segment内部计数,统计的时候是遍历每个Segment对象加锁统计(当然有-点点小小的优化措施,看视频吧..。