高并发编程之CnocurrentHashMap—源码剖析

HashMap在多线程环境下有线程安全问题,因此在多线程环境下不要直接使用HashMap,而是使用下面几种不同的方式去代替:

(1)使用ConcurrentHashMap

(2)使用Hashtable

(3)使用Collections.synchronizedMap(Map map)方法将HashMap包装成一个线程安全的集合

不过鉴于要保证多线程环境下程序的并发度,后两种方案直接Pass,因为他们保证线程安全的机制是synchronized,在并发很高的环境下效率很低,此时我们就可以使用ConcurrentHashMap

一、jdk1.7的实现以及源码分析

1、 ConcurrentHashMap的数据结构

jdk1.7 的ConcurrentHashMap的底层数据结构是数组+链表,如下图所示

ConcurrentHashMap是由Segment数组结构和HashEetry数组组成

Segment继承了ReentrantLock,是一个可重入的锁,它在ConcurretnHashMap中扮演锁的角色

HashEntry则用于存储键值对。它们之间的关系是:一个ConcurrentHashMap包含了一个Segment数组,一个Segment里维护了一个HashEntry数组,HashEntry数组和HashMap结构类似,是一种数组+链表的结构,当一个线程需要对HashEntry中的元素修改的时候,必须先获得Segment锁,下面是ConcutrrentHashMap的类图(借用网上的一张图):

2、Segment内部类

首先先来熟悉一下Segment,Segment是ConcurrentHashMap的内部类,它在ConcurrentMap就是扮演锁的角色,主要组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;
    // 和 HashMap 中的 HashEntry 作用一样,真正存放key-value的bucket
    transient volatile HashEntry<K,V>[] table;
    transient volatile int count;
    // 记得快速失败(fail—fast)么?
    transient int modCount;
    // 大小
    transient int threshold;
     // 负载因子
    final float loadFactor;
    
    //构造方法
    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
    } 

}

HashEntry是一个和HashMap类似的数据结构,这里值的注意的是他被volatile修饰了。这里复习一下volatile的特性:

(1)可以保证共享变量在多线程之间的可见性,即一个线程修改了共享变量的值对于其他线程是这个修改后的是可见的;

(2)可以禁止指令重排序;

(3)volatile可以保证对单次读写操作的原子性;

3、ConcurrentHashMap的构造方法

ConcurrentHashMap的构造方法中通过initialCapacity、loadFactor、concurrencyLevel三个参数完成了对Segment数组、段偏移量segmentShift、段掩码segmentMask和每一个Segment里的HashEntry数组的初始化。我们来看看源码是如何实现的:

 public ConcurrentHashMap(int initialCapacity,   //初始容量 默认是16
                          float loadFactor,      //加载因子  默认0.75
                          int concurrencyLevel) { //并发度 由DEFAULT_CONCURRENCY_LEVEL 决定
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;   //计算ssize左移的次数
        int ssize = 1;   //计算segment的大小
        while (ssize < concurrencyLevel) {//segment的大小为2^n
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;//计算每个segment的大小
        if (c * ssize < initialCapacity)//若是条件成立,表示c有余数,所以增加一个segment
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    } 

我们来捋捋构造方法:

首先了解几个常量:

static final int DEFAULT_CONCURRENCY_LEVEL = 16; //初始的并发等级,通过并发等级来确定Segment的大小

static final int MIN_SEGMENT_TABLE_CAPACITY = 2;// segment的最小值.2

 static final int MAX_SEGMENTS = 1 << 16; //segment数组最大长度,65535

private static final int MAXIMUM_CAPACITY = 1 << 30;  //每个HashEnty数组的最大长度 2^30
初始化Segment数组
 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        //Segment数组的大小必须是2的幂,比兔2,4,8....
        int sshift = 0;   //计算ssize左移的次数
        int ssize = 1;   //计算segment的大小
        while (ssize < concurrencyLevel) {//segment的大小为2^n
            ++sshift;    //sshift
            ssize <<= 1;
        }
        //segmentShift默认值32-4=28
        this.segmentShift = 32 - sshift;
        //segmentMask默认值是 15 即 0000 0000 0000 1111
        this.segmentMask = ssize - 1;

由上面的代码可以知道,Segment数组的长度是ssize,这个参数又是通过concurrencyLevel计算出来的。我了通过按位与的散列算法来定位segment数组索引,必须保证segment数组的长度是2的幂,所以必须计算出一个大于等于concurrencyLevel的最小2个幂大小的数组长度。同时concurrencyLevel最大值是2^16,即65535个,这意味着Segment数组的大小最大也是65535。

初始化segmentShift和segmentMask

这两个变量狮是在定位segment是的散列算法中用的,segmentShift用于定位参与散列计算的位数,segmentShift=32-sshift,其中sshift等于ssize从1向左移的次数,在默认情况下concurrencyLevel是16,因此ssize=16,sshift=4。,所以默认情况下是28。

segmentMask是散列运算的掩码,等于ssize-1,默认是15,掩码的二进制的各个位一般都是1.因为ssize的最大长度是65536,所以segmentShift的最大值是16,segmentMask的最大值是65536.

初始化Segment
if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;//计算每个segment的大小
if (c * ssize < initialCapacity)//若是条件成立,表示c有余数,所以增加一个segment
     ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;  //cap=2
 while (cap < c)
     cap <<= 1;
//创建一个segment,并且放在 segment[ 0]
Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
//创建Segment数组 大小是ssize
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//使用CAS初始化Segment数组的第一个元素segment[0]
UNSAFE.putOrderedObject(ss, SBASE, s0);
//Segment[] segments=ss;
this.segments = ss;

上面代码中cap的值就是每个HashEntry数组的初始长度,他等于initialCapacity/ssize的倍数,如果有余数在+1。如果c大于1,就会取待遇等于c的2的幂的值,所以c只可能是1或者是2^n。

4、定位Segment

ConcurrentHashMap使用了分段锁Segment来维护不同段的数据,那么在插入和获取元素的时候,必须先通过算法定位到Segment上之后才可以在具体的HashEntry用类似HashMap找元素的方法来定位一个元素。首先来看看ConcurretnHashMap使用的hash算法:

private static int hash(int h){
    h+=(h<<15)^0xffffcd7d;
    h^=(h>>>10);
    h+=(h<<3);
    h^=(h>>>6);
    h+=(h<<2)+(h<<14);
    return h^(h>>>16);
}

通过这个hash算法对hashCode在散列,目的是减少hash冲突,是元素可以均匀的分布在egment数组内,提高容器的使用率。基于这个hash(int h)计算出来的hash值,Segment通过下面这个算法定位到具体的Segment:

final Segment<K,V> segmentFor(int h){
    return segments[(hash>>>segmentShift)&segmentMask];
}

默认情况下,segmentShift是28,segmentMask是15,因此hash>>>segmentShift的意思就是让hash值的高4位参与到定位Sengmet运算中,上面说到过,segmentMask是散列运算的掩码,它等于ssize-1,ssize又是一个2的幂的数,因此segmentMask二进制的低位是连续的1,那么最终决定Segment位置索引的就是hash>>>segmentShift的值。

5、put()操作

put操作需要在定位到Segment之后,对Segment加锁。

  • 如果value==null,空指针异常;

  • 如果value不是null

  • 首先,计算hash(key),最终的hash值是hashCode(key) ^ hashSeed之后,再经过各种位操作之后的值
  • 然后,通过(hash>>>segmentShift)& segmentMark算法得到Segment[]的下标

构造器中只初始化了Segment[]下标0处的Segment对象,在put时,确定了Segment的位置,先判断该位置上的Segment有没有初始化,没有,先初始化

  • 最后,调用Segment对象的put方法存入键值对。
  • 对每个Segment加锁
  • 确定HashEntry[]的下标。index = (tab.length - 1) & hash;
  • 如果当前HashEntry没有初始化,先初始化,并将键值对插入
  • 已经初始化,遍历HashEntry链,有重复的,新值覆盖旧值,否则,插入
  • 插入之前,判断是否需要扩容,扩容:Segment[]是不能扩容的,只能扩容一个个Segment中的HashEntry[]的大小为原来的两倍

下面结合源码和注释来说明:

  public V put(K key, V value) {
       //找到对应的segment,把key,value放入对应的segment中
        Segment<K,V> s;
        //value不允许为null
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        //定位Segment:(hash>>>sengmentShift)& segmentmask
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) 
            //由于Segment数组在初始化的时候只初始化了segments[0]这个槽位,
            //在插入的时候首先需要初始化segments[j]
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
  }

Segment内部的put()方法

 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //每个segment进行put操作的时候都要进行加锁操作 tryLock()方法尝试获取锁
            HashEntry<K,V> node = tryLock() ? null :
            //尝试获取锁失败,计进入 scanAndLockForPut流程,循环尝试获取锁
            scanAndLockForPut(key, hash, value);
     
            //执行到这里,Segment就已经加上锁了,可以安全执行了
            V oldValue;
            try {
                //每个Segment中维护了一个HashEntry数组叫table
                HashEntry<K,V>[] tab = table;
                //定位HashEntry: hash&(tab.lenght-1)
                int index = (tab.length - 1) & hash;
                //找到HashEntry数组中table[index]元素(链表头结点)
                HashEntry<K,V> first = entryAt(tab, index);
                //遍历链表
                for (HashEntry<K,V> e = first;;) {
                    //e就是每个从链表中取出的结点
                    if (e != null) {
                        //更新key-value
                        K k;
                        //hash值相等 并且equals方法返回true就说明这是要找的key
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                //替换新值
                                e.value = value;
                                ++modCount;
                            }
                            //修改成功,退出循环
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        //新增key-value
                        if (node != null)
                            //1)之前等待锁的时候,如果新的链表接待被创建了,那就把值设置进去,并把此结点的next指向当前链表的头结点
                            node.setNext(first);
                        else
                            //2)如果显得结点还没创建,那就直接new一个
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //c就是当前HashEntry的元素多少 size
                        int c = count + 1;
                        //如果错过了阈值并且当前HashEntry的长度小于最大允许长度2^30就会扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            //如果没有超过阈值,那就把新新节点插入链表头部
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                         //添加成功,退出循环
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

获取写入锁

前面我们看到,在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。

下面我们来具体分析这个方法中是怎么控制加锁的。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node

    // 循环获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 进到这里说明数组该位置的链表是空的,没有任何元素
                    // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 顺着链表往下走
                e = e.next;
        }
        // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
        //    lock() 是阻塞方法,直到获取锁后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
                 //     所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。

6、rehash()扩容操作

Segment数组的长度一旦确定就不能改变,当一个Segment数组在插入新元素之前会先判断是否超过阈值,如果超阈值了,就会扩容,并且只是扩容当前Segment中的HashEntry[]。总的来说,Segment对扩容时机的判断比HashMap更恰当,因为HashMap的扩容是在元素添加会后判断的,如果超过阈值就会扩容,但是很有可能扩容之后就没有新元素插入了,这时HashMap就进行了一次无效的扩容。

private void rehash(HashEntry<K,V> node) {
           //因为rehash()操作是被put()调用的,pput()已经获取过锁了,这里就不需要再获取锁了
            HashEntry<K,V>[] oldTable = table;
            //获取旧表的容量
            int oldCapacity = oldTable.length;
            //新表对容量*2
            int newCapacity = oldCapacity << 1;
            //计算新的阈值
            threshold = (int)(newCapacity * loadFactor);
            //创建新的HashEntry数组,大小是旧数组的2倍
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            //遍历旧的HashEntry数组 把其中的元素移动到新的HashEntry数组中
            for (int i = 0; i < oldCapacity ; i++) {
                //HashEntry数组索引为i的槽位的值
                HashEntry<K,V> e = oldTable[i];
                //HashEntry数组索引为i的槽位的值如果不为空就搬移
                if (e != null) {
                    //尝试获取链表头结点的下一个结点
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        //该槽位只有一个节点,直接搬移过去
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        //链表上有多个结点
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        //由于扩容之后有些key的索引就会改变,需要找到key发生变化的结点和新索引
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                //新索引
                                lastIdx = k;
                                //节点
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            //扩容完成后插入新的结点
            int nodeIndex = node.hash & sizeMask; // add the new node
            //设置新节点为头结点 
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            //更新table
            table = newTable;
 }

7、get操作

Segment的get操作实现非常简单和高效。先经过一次hash(),然后使用散列值运算定位到Segment,在定位到聚义的元素的过程。

get操作的高效是因为整个get操作不需要加锁,为什么他不需要加锁呢?是因为get方法中使用的共享变量都被定义成了volatile类型,比如:统计当前Segment大小的count,和用于存储key-value的HashEntry。定义成volatile的变量能够在多个先后餐呢个之间保证可见性,如果与多个线程读,它读取到的值一定是当前这个变量的最新值。

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        //计算key的hash值
        int h = hash(key);
        //u为segment对象在数组中的偏移量
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        //s就是segment对象
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            //((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE是目标key的槽位在HashEntry的偏移量
            //下面的for循环就是遍历链表的
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    //找到目标值返回
                    return e.value;
            }
        }
        //没找到返回null
        return null;
 }

8、size操作

size()就是求当前ConcurrentHashMap中元素个数的方法,它是弱一致性的。方法的逻辑大致是:首先他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入或删除,计算的结果是准确的,然后返回此结果;如果尝试了,他就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回。

public int size() {
  final Segment<K,V>[] segments = this.segments;
  //ConcurrentHashMap大小  
  int size;
  boolean overflow; // true if size overflows 32 bits
  //当前size最新统计的size大小  
  long sum;         // sum of modCounts
  //前一次的size大小  
  long last = 0L;   // previous sum
  //重试的次数  
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      //操作次数,加锁  
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
           //ensureSegment方法确保Segment是被初始化了的 
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      //遍历所有Segment,把所有Segment中的元素个数相加==>这个值在modCount中  
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    //结束后如果尝试的次数大于RETRIES_BEFORE_LOCK就说明加锁了,在这里需要把每个Segment解锁
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  //返回结果  
  return overflow ? Integer.MAX_VALUE : size;
}

二、jdk1.8的实现以及源码分析

1、ConcurrentHashMap的数据结构

JDK1.8的实现已经摒弃了Segment的概念,而是直接使用 数组+链表+红黑树 的数据结构来实现的,并发控制使用的是CAS+synchronized,jdk1.8版本的ConcurrentHashMap看起来就像是优化过之后线程安全的HashMap,虽然在JDK1.8中还能看到Segment,但是已经简化了属性,只是为了兼容旧版本。

2、重要的属性和内部类

//默认为0,当初始化或它表示即将创建的哈希表的大小,
//当扩容完成后表示下一次哈希表扩容的阈值
private transient volatile int sizeCtl;  
// hash表
transient volatile Node<K,V>[] table;  
//扩容时的新hash表
private transient volatile Node<K,V>[] nextTable; 
内部类Node
 static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
     
        Node(int hash, K key, V val, Node<K,V> next) {
          this.hash = hash;                         
          this.key = key;                           
          this.val = val;                           
          this.next = next;                         
        }                                              
 }

这个Node和jdk1.8中HashMap的Node是一样的。jdk1.8中的ConcurrentHashMap保证线程安全使用的是CAS+synchronized

内部类ForwardingNode

扩容时如果某个链表转移完毕,就会用ForwardingNode作为旧哈希表这个槽位的头结点,下面是他的属性和构造方法:

 /**                                                           
  * A node inserted at head of bins during transfer operations.
  */                                                           
 static final class ForwardingNode<K,V> extends Node<K,V> {    
     final Node<K,V>[] nextTable;                              
     ForwardingNode(Node<K,V>[] tab) {                         
         super(MOVED, null, null, null);                       
         this.nextTable = tab;                                 
     }                                                         
     
 }
内部类ReservationNode

用在compute以及cpmouteIfAbsent时用来占位,计算完成后替换为普通Node结点

/**                                                        
 * A place-holder node used in computeIfAbsent and compute 
 */                                                        
static final class ReservationNode<K,V> extends Node<K,V> {
    ReservationNode() {                                    
        super(RESERVED, null, null, null);                 
    }                                                      
                                                           
    Node<K,V> find(int h, Object k) {                      
        return null;                                       
    }                                                      
}                                                          
内部类 TreeBin

红黑树的头结点,存储root和first结点

static final class TreeBin<K,V> extends Node<K,V> {                      
    TreeNode<K,V> root;                                                  
    volatile TreeNode<K,V> first;                                        
    volatile Thread waiter;                                              
    volatile int lockState;                                              
    // values for lockState                                              
    static final int WRITER = 1; // set while holding write lock         
    static final int WAITER = 2; // set when waiting for write lock      
    static final int READER = 4; // increment value for setting read lock
}
内部类 TreeNode

红黑树的结点,存储关于它的parent、left、right结点

 /**                                                              
  * Nodes for use in TreeBins                                     
  */                                                              
 static final class TreeNode<K,V> extends Node<K,V> {             
     TreeNode<K,V> parent;  // red-black tree links               
     TreeNode<K,V> left;                                          
     TreeNode<K,V> right;                                         
     TreeNode<K,V> prev;    // needed to unlink next upon deletion
     boolean red;                                                 
                                                                  
     TreeNode(int hash, K key, V val, Node<K,V> next,             
              TreeNode<K,V> parent) {                             
         super(hash, key, val, next);                             
         this.parent = parent;                                    
     }  
 }

3、ConcurrentHashMap的构造方法

jdk1.8中的ConcurrentHashMap是懒惰初始化的,在构造方法中只是计算了一下哈希表的大小,只有在第一次使用到的时候才会真正创建。

 public ConcurrentHashMap(int initialCapacity,                                
                          float loadFactor, int concurrencyLevel) {           
     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
         throw new IllegalArgumentException();                                
     if (initialCapacity < concurrencyLevel)   // Use at least as many bins   
         initialCapacity = concurrencyLevel;   // as estimated threads        
     long size = (long)(1.0 + (long)initialCapacity / loadFactor); 
     //tableSizeFor方法保证哈希表的大小是2的幂
     int cap = (size >= (long)MAXIMUM_CAPACITY) ?                             
         MAXIMUM_CAPACITY : tableSizeFor((int)size); 
     //把哈希表的大小赋给成员变量sizeCtl
     this.sizeCtl = cap;                                                      
 }                                                                            

4、添加元素操作—put()

首先我们通过源码看到,jdk1.8中ConcurrentHashMap不允许key或value为null,如果你违反了就会报NPE。

之后调用spread方法使得key的hash码更散列。

在之后会首先判断当前hash表时候被初始化了,因为jdk1.8中ConcurrentHashMap是懒惰初始化的,只有这个爱第一次使用的时候才会初始化。

如果哈希表已经初始化了,然后计算key的桶的位置,如果在这个位置还是null,那就直接把元素放到桶的这个位置即可;

在之后会判断当前hash表是否在扩容中,如果没有:

  • 首先对链表的头结点/红黑树的root结点上锁

  • 如果key的桶的位置还是链表,那就在链表的尾部插入新的元素,再次过程中还会统计链表的结点个数,存储在modCount中,用于在插入之后判断是否需要扩容

  • 如果key的桶的位置已经是红黑树了,那就把新的元素插入红黑树中

插入完毕之后会判断modCount的值是否已经大于等于树化阈值(8)了,如果是,那就把链表转换为红黑树

public V put(K key, V value) {
    return putVal(key, value, false);             
}

/** Implementation for put and putIfAbsent */                                     
final V putVal(K key, V value, boolean onlyIfAbsent) { 
    //ConcurrentHashMap不允许key-value为null
    if (key == null || value == null) throw new NullPointerException(); 
    //是的hash值更分布
    int hash = spread(key.hashCode());                                            
    int binCount = 0;                                                             
    for (Node<K,V>[] tab = table;;) {                                             
        Node<K,V> f; int n, i, fh; 
        //判断hash表是否存在
        if (tab == null || (n = tab.length) == 0) 
            //创建hash表,initTable方法使用CAS初始化hash表
            tab = initTable(); 
        //i=(n-1)&hash 计算出桶的位置,如果在这个桶中还没有元素就直接把新元素放入桶中
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 
            //添加元素使用了CAS,比synchronized性能高
            if (casTabAt(tab, i, null,                                            
                         new Node<K,V>(hash, key, value, null)))                  
                break;                   // no lock when adding to empty bin      
        }  
        //如果这个条件成立,说明当前hash表在扩容过程中,判断的依据是看桶的头结点是否是ForwardingNode
        else if ((fh = f.hash) == MOVED) 
            //其他线程过来帮忙扩容
            tab = helpTransfer(tab, f);                                           
        else {       
            //如果这个条件成立,说明当前hash表存在、(n-1)&hash 位子有元素了并且没有在扩容
            V oldVal = null;
            //对(n-1)&hash位置链表的头结点加锁
            synchronized (f) {     
                //再次确认链表的头结点有没有被移动
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { 
                        //槽位中的元素还是链表
                        binCount = 1;    //使用binCount统计当前桶中的元素个数     
                        for (Node<K,V> e = f;; ++binCount) {                      
                            K ek;  
                            //遍历链表,发现相同的key
                            if (e.hash == hash &&                                 
                                ((ek = e.key) == key ||                           
                                 (ek != null && key.equals(ek)))) {               
                                oldVal = e.val;   
                                //更新旧值
                                if (!onlyIfAbsent)                                
                                    e.val = value;
                                //更新完毕之后直接退出
                                break;                                            
                            }                                                     
                            Node<K,V> pred = e;                                   
                            if ((e = e.next) == null) { 
                                //这里就是新元素插入的逻辑了,可以看到还是使用了尾插法
                                pred.next = new Node<K,V>(hash, key,              
                                                          value, null);           
                                break;                                            
                            }                                                     
                        }                                                         
                    }                                                             
                    else if (f instanceof TreeBin) { 
                        //槽位中的元素已经构成了红黑树了,那就把元素加入到红黑树中
                        Node<K,V> p;                                              
                        binCount = 2;                                             
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,          
                                                       value)) != null) {         
                            oldVal = p.val;                                       
                            if (!onlyIfAbsent)                                    
                                p.val = value;                                    
                        }                                                         
                    }                                                             
                }                                                                 
            }                                                                     
            if (binCount != 0) {  
                //如果桶中元素个数大于等于树化阈值(8)就将链表转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD) 
                    //转化为红黑树的逻辑
                    treeifyBin(tab, i);                                           
                if (oldVal != null)                                               
                    return oldVal;                                                
                break;                                                            
            }                                                                     
        }                                                                         
    }                                                                             
    addCount(1L, binCount);                                                       
    return null;                                                                  
}                                                                                 

5、初始化哈希表—initTable()

在put过程中如果是第一次put,此时hash表还没有初始化,因此需要首先初始化哈希表。

我们一起来看看hash表的初始化逻辑吧!

 private final Node<K,V>[] initTable() {                               
     Node<K,V>[] tab; int sc;                                          
     while ((tab = table) == null || tab.length == 0) { 
         //在构造方法中sizeCtl就是hash表的初始大小
         if ((sc = sizeCtl) < 0)                                       
             Thread.yield(); // lost initialization race; just spin 
         //使用CAS将sc的值设置为-1 表示初始化table,设置成功后返回true
         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
             //获取到锁,创建table.
             //此时其他线程会在while()循环中忙等待(即自旋)
             try {    
                 //再次检查hash表没有创建
                 if ((tab = table) == null || tab.length == 0) {       
                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;         
                     @SuppressWarnings("unchecked")  
                     //创建hash表
                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];   
                     table = tab = nt; 
                     //计算下次hash表扩容的阈值
                     sc = n - (n >>> 2);                               
                 }                                                     
             } finally { 
                 //之后sizeCtl就表示hash表的扩容阈值了
                 sizeCtl = sc;                                         
             }                                                         
             break;                                                    
         }                                                             
     }                                                                 
     return tab;                                                       
 }                                                                     

6、统计链表的长度—addCount()

在put操作的最后一个操作中他会调用addCount方法来统计哈希表的使用情况,如果已经超过阈值了就会扩容。具体请看注释分析。

//check就是链表中结点个数
private final void addCount(long x, int check) {                           
    CounterCell[] as;   //累加单元数组,这一点可以参考LongAdder类
    long b, s;  
    //初始化累加数组以及累加单元的操作,具体参考LongAdder类的设计思想
    if ((as = counterCells) != null ||                                     
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; 
        long v; 
        int m;                                      
        boolean uncontended = true;                                        
        if (as == null || (m = as.length - 1) < 0 ||                       
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||          
            !(uncontended =                                                
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {   
            fullAddCount(x, uncontended);                                  
            return;                                                        
        }                                                                  
        if (check <= 1)                                                    
            return;
        //获取哈希表中元素的个数
        s = sumCount();                                                    
    }                                                                      
    if (check >= 0) {                                                      
        Node<K,V>[] tab, nt; int n, sc;  
        //sizeCtl表示扩容阈值,s是当前哈希表中元素的个数如果超过阈值就会触发扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&       
               (n = tab.length) < MAXIMUM_CAPACITY) { 
            int rs = resizeStamp(n); 
            //其他线程进来之后发现sizeCtl<0(因为第一个线程把这个值设置为负数了)
            if (sc < 0) {                                                  
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||   
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 
                    transferIndex <= 0)                                    
                    break;    
                //如果新的hash表已经被创建了,就使用CAS帮助第一个线程完成扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))        
                    transfer(tab, nt);                                     
            }
            //正常情况下,第一个线程执行到这以后首先使用CAS把sizeCtl的值修改为一个负数,修改成功后就进入到transfer方法中扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,                
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) 
                //第一个线程进来后初始化一个新的hash表
                transfer(tab, null);                                       
            s = sumCount();                                                
        }                                                                  
    }                                                                      
}  


//注意累加数组的类型定义CounterCell上使用了@sun.misc.Contended注解,
//这个注解可以避免缓存行伪共享问题
@sun.misc.Contended static final class CounterCell {
    //保存了每一个桶中元素的个数
    volatile long value;                              
    CounterCell(long x) { value = x; }              
}                                                   

7、获取元素操作—get()

get方法没有加锁,这依赖于volatile带来的便利性。

public V get(Object key) {                                              
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 
    //spread可以保证hash值是一个正整数
    int h = spread(key.hashCode()); 
    //hash表存在并且key的槽位不为null  还是使用了(n - 1) & h)定位了key的桶的位置
    if ((tab = table) != null && (n = tab.length) > 0 &&                
        (e = tabAt(tab, (n - 1) & h)) != null) { 
        //头结点就是要找的值
        if ((eh = e.hash) == h) {                                       
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                return e.val;                                           
        }      
        //hash值小于0表示hash表在扩容中,或者已经转化为红黑树了
        else if (eh < 0)                                                
            return (p = e.find(h, key)) != null ? p.val : null;         
        while ((e = e.next) != null) { 
            //遍历链表,查找key对应的value
            if (e.hash == h &&                                          
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;                                           
        }                                                               
    }                                                                   
    return null;                                                        
}                                                                       

8、获取元素的个数—size()

public int size() {
    //直接调用sumCount()统计哈希表元素个数
    long n = sumCount();                                       
    return ((n < 0L) ? 0 :                                     
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);                                           
}    

final long sumCount() {                            
    CounterCell[] as = counterCells; 
    CounterCell a;
    long sum = baseCount;
    //counterCells中保存了每一个桶中元素的个数,现在统计只需要遍历counterCells把每个桶中元素个数相加即可
    if (as != null) {                              
        for (int i = 0; i < as.length; ++i) {      
            if ((a = as[i]) != null)               
                sum += a.value;                    
        }                                          
    }                                              
    return sum;                                    

9、扩容操作—transfer()

扩容的逻辑比较复杂,这里不再贴出源码了,我们来捋一下关键流程就好了:

  • 首先扩容会传进来两个参数:老的哈希表和新的哈希表(Node<K,V>[] tab, Node<K,V>[] nextTab),在第一个线程调用这个方法的时候新的哈希表nextTab=null
  • 当线程判断nextTab=null之后,会new一个是元hash表2倍大小的新哈希表Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  • 创建新哈希表完成之后就是链表的搬迁工作了。这个过程中值的注意的是,当一个线程把旧哈希表上桶中的元素搬迁到新的哈希表上之后或者这个桶中没有元素,它用一个ForwardingNode类型的结点挂在这个桶下,以此告诉其他线程这个桶位置已经被处理了,可以说非常高效。

  • 还需要注意的是,在移动桶中元素的过程中,还是需要对链表和红黑树分别处理,前者的判断依据是头结点的hash值大于等于0,后者的判断依据是头结点的hash值小于0并且头结点是TreeBin类的实例。

留言区

还能输入500个字符