高并发编程之CnocurrentHashMap—源码剖析

面试官:HashMap在多线程环境下存在线程安全问题,那你⼀般都是怎么处理这种情况的?

一般在多线程的场景,我都会使用好几种不同的方式去代替:

  • 使用Collections.synchorinzedMap构造一个线程安全的HashMap
  • 使用Hashtable
  • 使用ConcurrentHashMap

不过鉴于要保证多线程环境下程序的并发度,后两种方案直接Pass,因为他们保证线程安全的机制是synchronized,在并发很高的环境下效率很低,此时我们就可以使用ConcurrentHashMap

Collections.synchorinzedMap实现线程安全原理简单分析

Collections.synchorinzedMap和Hashtable实现线程安全的手段是一样的,都是使用synchronized关键字实现的。Collections.synchorinzedMap实现线程安全的方式是在SynchronizedMap中维护了一个Map对象和一个互斥锁对象mutex

Map<String,String> map=Conllections.synchronizedMap(new HashMap(16));

我们在调⽤这个⽅法的时候就需要传⼊⼀个Map,可以看到有两个构造器,如果你传⼊了mutex参数,
则将对象排斥锁赋值为传⼊的对象。如果没有传入metux参数,则将对象排斥锁赋值为this,即调⽤synchronizedMap的对象,就是上⾯的Map。

创建出synchronizedMap之后,再操作map的时候,就会对⽅法上锁,如图一眼望去全是,你说这效率能高吗?

使用synchronized锁住HashMap方法的调用,这样保证之后线程在调用HashMap方法之前必须获得SynchronizedMap中的互斥锁对象的锁才能继续执行,否则只会等待,从而实现了线程安全,其实实质和Hashtable一样,在高并发环境下不推荐使用,要用线程安全的Map还是得用ConcurrentHashMap。

一、jdk1.7的实现以及源码分析

Java7的ConcurrentHashMap里有多把锁,每一把锁用于其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率.这就是“分段锁”技术。

1、 ConcurrentHashMap的数据结构

jdk1.7 的ConcurrentHashMap的底层数据结构是数组+链表,ConcurrentHashMap是主要由Segment数组结构和 HashEntry数组结构组成。如下图所示。

整个ConcurrentHashMap 由一个个 Segment 组成,Segment 代表“部分” 或 “一段”的意思,所以很多地方都会将其描述为分段锁。

简单的理解,ConcurrentHashMap 是一个 Segment数组,Segment 通过继承 ReentrantLock来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment是线程安全的,也就实现了全局的线程安全性。每个Segment中维护了一个HashEntry数组,它用于存储键值对。

  • Segment继承了ReentrantLock,是一个可重入的锁,它在ConcurretnHashMap中扮演锁的角色

  • HashEntry则用于存储键值对。它们之间的关系是:一个ConcurrentHashMap包含了一个Segment数组,一个Segment里维护了一个HashEntry数组,HashEntry数组和HashMap结构类似,是一种数组+链表的结构,当一个线程需要对HashEntry中的元素修改的时候,必须先获得Segment锁。

下面是ConcutrrentHashMap的继承体系:

ConcurrentHashMap重要成员变量和常量

//默认并发度16,这个值也是默认Segment数组默认的大小,最大值是65535
static final int DEFAULT_CONCURRENCY_LEVEL = 16; 
//Segment数组最大大小,也即最大允许并发量是65535
static final int MAX_SEGMENTS = 1 << 16;  
static final int DEFAULT_INITIAL_CAPACITY = 16;   //HashEntry数组默认初始化大小16
static final int MAXIMUM_CAPACITY = 1 << 30;  //HashEntry数组最大容量2^30
static final float DEFAULT_LOAD_FACTOR = 0.75f;  //HashEntry数组默认加载因子0.75
static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //HashEntry最小容量
//和size()方法有关,当连续两次统计size有误之后才会上锁
static final int RETRIES_BEFORE_LOCK = 2; 
//Segment数组,存放HashEntry的地方,里面维护了一个HashEntry
final Segment<K,V>[] segments;
//这两个变量都是用于定位segment的
final int segmentMask;  //定位segments数组中segment掩码
final int segmentShift; //segment在segment数组中的位移量

2、Segment内部类

首先先来熟悉一下Segment,Segment是ConcurrentHashMap的内部类,它在ConcurrentHashMap就是扮演锁的角色,主要组成如下:

以及还有一个常量:

//不断尝试获取锁的次数,和CPU核数有关,多核CPU最多64次,单核CPU只尝试1次
static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

HashEntry是一个和HashMap类似的数据结构,这里值的注意的是他被volatile修饰了。这里复习一下volatile的特性:

(1)可以保证共享变量在多线程之间的内存可见性,即一个线程修改了共享变量的值对于其他线程是这个修改后的是可见的;

(2)可以禁止指令重排序;

(3)volatile可以保证对单次读写操作的原子性;

这里使用volatile修饰HashEntry数组的目的当然是为了保证内存的可见性问题。为什么需要保证HashEntry数组的内存可见性呢?

在往下看源码就会发现,jdk1.7以前的CHM的get操作是不需要加锁的,即可以并发的读,只有在写数据的时候才需要加锁,因此为了保证不同线程之前对于一个共享变量的数据一致,使用volatile再好不过。

3、ConcurrentHashMap的构造方法

ConcurrentHashMap的构造方法中通过initialCapacity、loadFactor、concurrencyLevel三个参数完成了对Segment数组、段偏移量segmentShift、段掩码segmentMask和每一个Segment里的HashEntry数组的初始化。我们来看看源码是如何实现的:

初始化

initialCapacity:初始容量,这个值指的是整个 ConcurrentHashMap的初始容量,实际操作的时候需要平均分给每个 Segment。

loadFactor:负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的。

concurrencyLevel:并行级别、并发数、Segment 数。默认是16,也就是说 ConcurrentHashMap 默认有16个 Segment,所以理论上,最多同时支持16个线程并发写。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它就不可以扩容了。最大值受MAX_SEGMENTS控制,为65535个。

new ConcurrentHashMap()无参进行初始化以后:

(1)Segment数组的长度是16,并且segments[0]初始化了,其他segments[i]还是null,在使用之前需要调用ensureSegment()方法执行初始化

(2)segments[i]的默认大小是2,也即HashEntry的默认大小是2,负载因子为0.75,得出初始阈值为1.5,也就插入第一个元素不会触发扩容,插入第二个进行一次扩容。

4、定位Segment

ConcurrentHashMap使用了分段锁Segment来维护不同段的数据,那么在插入和获取元素的时候,必须先通过算法首先定位到Segment上之后才可以在具体的HashEntry用类似HashMap找元素的方法来定位一个元素。首先来看看ConcurretnHashMap使用的hash算法:

通过这个hash算法对hashCode再散列,可以看到进行了非常复杂的移位运算,目的就是减少hash冲突,使元素可以均匀的分布在Segment数组内,提高容器的使用率。基于这个hash(int h)计算出来的hash值,Segment通过下面这个算法定位到具体的Segment:

通过上面的分析我们知道,在默认情况下segmentShift是28,segmentMask是15,因此hash>>>segmentShift的意思就是只让hash值的高4位参与到定位Sengmet运算中,上面说到过,segmentMask是散列运算的掩码,它等于ssize-1,ssize又是一个2的幂的数,因此segmentMask二进制的低位是连续的1,那么最终决定Segment位置索引的就是hash>>>segmentShift的值

5、put()方法


put流程梳理:
(1)首先检查value,如果value==null,抛出NPE;

(2)根据hash值定位到Segment,定位segments[j]的算法是:j=(hash>>>sgmentShift)&sgmentMask,其实j就是hash值的低4位

(3)获得到Segment后判断是否为null,如果是null,表示还没有初始化,那先调用ensureSegment()方法初始化Segment

(4)最后,调用Segment对象的put方法存入键值对。

4.1、首先通过父类ReentrantLock的tryLock()方法对每个Segment加锁,上锁成功之后继续执行,否则调用scanAndLockForPut()方法不断的调用tryLock()尝试获取锁,尝试的次数和CPU核数有关,单核CPU值尝试1次,多核CPU最多重试64次,如果重试结束还没有获取锁,那就阻塞等待锁;
4.2、确定HashEntry[i]的下标。index = (tab.length - 1) & hash;
4.3、如果当前HashEntry没有初始化,先初始化,并将键值对插入;
4.4、已经初始化,遍历HashEntry链,有重复的,新值覆盖旧值,否则,插入;
4.5、插入之后,判断是否需要扩容,扩容:Segment[]是不能扩容的,只能扩容一个个Segment中的HashEntry[]的大小为原来的两倍

Segment内部的put()方法

下面是Segment内部put方法的源码,比较重要,我这里把它列出来并写上详细的注释,大家可以参考学习。

 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
     //每个segment进行put操作的时候都要进行加锁操作 tryLock()方法尝试获取锁
     HashEntry<K,V> node = tryLock() ? null :
          //尝试获取锁失败,计进入 scanAndLockForPut流程,循环尝试获取锁
         scanAndLockForPut(key, hash, value);
     //执行到这里,Segment就已经加上锁了,可以安全执行了
     V oldValue;
     try {
         //每个Segment中维护了一个HashEntry数组叫table
         HashEntry<K,V>[] tab = table;
         //定位HashEntry: hash&(tab.lenght-1)
         int index = (tab.length - 1) & hash;
         //找到HashEntry数组中table[index]元素(链表头结点)
         HashEntry<K,V> first = entryAt(tab, index);
         //从链表头部遍历链表
         for (HashEntry<K,V> e = first;;) {
            //如果哈希槽位不空
           if (e != null) {
             //更新key-value
             K k;
            //hash值相等 并且equals方法返回true就说明这是要找的key,那就执行新值替换旧值的逻辑
             if ((k = e.key) == key ||
                 (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        //替换新值
                        e.value = value;
                        ++modCount;
                     }
                     //修改成功,退出循环
                     break;
                 }
                 e = e.next;
             }else {
                //不为空则需要新建⼀个 HashEntry 并加⼊到 Segment 中,同时会先判断是否需要扩容
                if (node != null)
                //1)之前等待锁的时候,如果新的链表接待被创建了,那就把值设置进去,并把此结点的next指向当前链表的头结点
                   node.setNext(first);
                else
                   //2)如果哈希表还没创建,那就直接new一个
                   node = new HashEntry<K,V>(hash, key, value, first);
                //c就是当前HashEntry的元素多少 size
                int c = count + 1;
                //如果错过了阈值并且当前HashEntry的长度小于最大允许长度2^30就会扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                   //如果没有超过阈值,那就把新新节点插入链表头部
                   setEntryAt(tab, index, node);
                 ++modCount;
                 count = c;
                 oldValue = null;
                 //添加成功,退出循环
                 break;
            }
        }
      } finally {
         unlock();
      }
      return oldValue;
  }

scanAndLockForPut()——获取写入锁

前面我们看到,在往某个 segment 中 put 的时候,首先会调用**tryLock()**方法尝试获取锁如果获取成功就继续执行put逻辑,否则调用 scanAndLockForPut(key, hash, value)

下面我们来具体分析这个方法中是怎么控制加锁的。

这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。

这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。

6、rehash()扩容方法

重复一下,Segment 数组不能扩容,扩容的是单个Segment内部数组 HashEntry[],扩容后,容量为原来的2倍。

触发扩容的时机:put 的时候,如果判断该值的插入会导致该 Segment内部的元素个数超过阈值,那么先进行扩容,再插值。该方法不需要考虑并发,因为 put 操作,是持有独占锁的。

private void rehash(HashEntry<K,V> node) {
           //因为rehash()操作是被put()调用的,put()已经获取过锁了,这里就不需要再获取锁了
            HashEntry<K,V>[] oldTable = table;
            //获取旧表的容量
            int oldCapacity = oldTable.length;
            //新表对容量*2
            int newCapacity = oldCapacity << 1;
            //计算新的阈值
            threshold = (int)(newCapacity * loadFactor);
            //创建新的HashEntry数组,大小是旧数组的2倍
            HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            //遍历旧的HashEntry数组 把其中的元素移动到新的HashEntry数组中
            for (int i = 0; i < oldCapacity ; i++) {
                //HashEntry数组索引为i的槽位的值
                HashEntry<K,V> e = oldTable[i];
                //HashEntry数组索引为i的槽位的值如果不为空就搬移
                if (e != null) {
                    //尝试获取链表头结点的下一个结点
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;  //idx=e.hash&(newcap-1)
                    if (next == null)   //  Single node on list
                        //该槽位只有一个节点,直接搬移过去
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        //槽位上有多个结点
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        //由于扩容之后有些key的索引就会改变,需要找到key发生变化的结点和新索引
                        for (HashEntry<K,V> last = next;last != null;last = last.next) {
                            //从头遍历链表结点
							int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                //新索引
                                lastIdx = k;
                                //节点
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            //扩容完成后插入新的结点
            int nodeIndex = node.hash & sizeMask; // add the new node
            //设置新节点为头结点 
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            //更新table
            table = newTable;
 }

7、get()方法

Segment的get操作实现非常简单和高效。先经过一次hash(),然后使用散列值运算定位到Segment,在定位到聚义的元素的过程。

get操作的高效是因为整个get操作不需要加锁,为什么他不需要加锁呢?是因为get方法中使用的共享变量都被定义成了volatile类型,比如:统计当前Segment大小的count,和用于存储key-value的HashEntry。定义成volatile的变量能够在多个线程之间保证内存可见性,如果与多个线程读,它读取到的值一定是当前这个变量的最新值。

get操作流程
(1)计算 hash值,找到 Segment数组中的下标。

(2)再次根据 hash 找到每个 Segment内部的 HashEntry[]数组的下标。

(3)遍历该数组位置处的链表,直到找到相等(== || equals)的 key。

get 是不需要加锁的:原因是get方法是将共享变量(table)定义为volatile,让被修饰的变量对多个线程可见(即其中一个线程对变量进行了修改,会同步到主内存,其他线程也能获取到最新值),允许一写多读的作。

8、size()方法

size()方法就是求当前ConcurrentHashMap中元素实际个数的方法,它是弱一致性的。方法的逻辑大致是:首先他会使用不加锁的模式去尝试计算 ConcurrentHashMap 的 size,比较前后两次计算的结果,结果一致就认为当前没有元素加入或删除,计算的结果是准确的,然后返回此结果;如果尝试了三次之后结果还是不一致,它就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回。

public int size() {
  final Segment<K,V>[] segments = this.segments;
  //ConcurrentHashMap大小  
  int size;
  boolean overflow; // true if size overflows 32 bits
  //当前size最新统计的size大小  
  long sum;         // sum of modCounts
  //前一次的size大小  
  long last = 0L;   // previous sum
  //重试的次数  
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      //超过尝试次数,加锁  
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
           //ensureSegment方法确保Segment是被初始化了的 
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      //遍历所有Segment,把所有Segment中的元素个数相加==>这个值在modCount中  
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    //结束后如果尝试的次数大于RETRIES_BEFORE_LOCK就说明加锁了,在这里需要把每个Segment解锁
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  //返回结果  
  return overflow ? Integer.MAX_VALUE : size;
}

二、jdk1.8的实现以及源码分析

1、ConcurrentHashMap的数据结构

JDK1.8的实现已经摒弃了Segment的概念,而是直接使用 数组+链表+红黑树 的数据结构来实现的,并发控制使用的是CAS+synchronized,jdk1.8版本的ConcurrentHashMap看起来就像是优化过之后线程安全的HashMap,虽然在JDK1.8中还能看到Segment的身影,但是已经简化了属性,只是为了兼容旧版本。下图是jdk1.8中ConcurrentHashMap的结构示意图:

结构上和 Java8 的ConcurrentHashMap和HashMap 基本上一样,不过它要保证安全性,所以源码上确实要复杂一些

2、ConcurrentHashMap的属性和初始化

(1)重要属性

private static final int DEFAULT_CAPACITY = 16;   //哈希表默认初始容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;   //哈希表最大容量
private static final float LOAD_FACTOR = 0.75f;   //默认加载因子
static final int TREEIFY_THRESHOLD = 8;  //树化阈值,大于这个值将会执行树化逻辑
static final int UNTREEIFY_THRESHOLD = 6;  //树退回链表的阈值
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认并发度
//最小哈希表树化阈值,当链表长度大于等于8时不一定会立即转化为红黑树,哈希表大小不够64只会扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//默认为0,它表示即将创建的哈希表的大小,当扩容完成后表示下一次哈希表扩容的阈值
private transient volatile int sizeCtl;  
// hash表,存储数据的地方
transient volatile Node<K,V>[] table;  
//扩容时的新hash表
private transient volatile Node<K,V>[] nextTable; 

(2)初始化

根据initCapacity和加载因子loadFactory计算出size,然后寻找size的最近的2的幂,如果size的最大值是MAX_ARRAY_SIZE 即:Integer.MAX_VALUE-8

另外我们可以观察到,ConcurrentHashMap是懒惰初始化的,构造方法中只是将初始容量计算出来保存在sizeCtl变量中。再往下阅读源码会发现,他是在第一次put的时候执行哈希表的初始化的。

内部类 Node

Node是普通的哈希槽位结点或链表结点类型,保存了一个节点的hash、key、value、以及下一个结点。和HashMap在 jdk1.8的实现类似。下面是源码。

内部类 TreeNode

TreeNode是红黑树的结点,当哈希槽位链表长度大于8并且哈希表的大小大于64的时候,链表就会转化为红黑树,一个TreeNode界定中存储关于它的父节点parent、左孩子left、右孩子right、以及结点的红黑标记,下面是源码。

3、添加元素操作—put()

put()方法流程梳理:

(1)首先我们通过源码看到,jdk1.8中ConcurrentHashMap不允许key或value为null,如果你违反了就会报NPE。

(2)之后获取到hash值,在之后会首先判断当前hash表是否被初始化了,因为jdk1.8中ConcurrentHashMap是懒惰初始化的,只有第一次put的时候才会初始化。初始化会调用initTable方法,该方法中使用CAS设置seizeCtl的值来保证线程安全。

(3)如果哈希表已经初始化了,然后计算key的哈希桶的位置,如果在这个位置还是null,那就直接使用CAS设置新节点到桶的这个位置即可;定位桶的算法还是i=(tab.length-1)&hash

(4)如果检查发现当前槽位的头结点的hash值==-1(标记为是ForwardingNode结点了),表示当前hash表正在扩容中,此时其他线程过去帮忙扩容;

(5)如果当前hash表存在、(n-1)&hash 位子有元素了并且没有在扩容,那就是发生了哈希冲突,解决冲突之前首先对链表的头结点/红黑树的root结点上锁

  • 5.1、如果key的桶的位置还是链表,那就在链表的尾部插入新的元素,再次过程中还会统计链表的结点个数,存储在bitCount中,用于在插入之后判断是否需要扩容以及检查是否相同key的结点存在,如果存在那就执行值替换逻辑;

  • 5.2、如果key的桶的位置已经是红黑树了,那就把新的元素插入红黑树中

(6)插入完毕之后会判断binCount的值是否已经大于等于树化阈值(TREEIFY_THRESHOLD=8)了,如果是,那就把链表转换为红黑树,不过最后转不转得成还要看哈希表的容量是不是大于MIN_TREEIFY_CAPACITY(64),如果没有达到只会扩容

下面是put方法的源码,非常重要,我做了详细的注释,大家可以参考并理解。

//外面调用的方法,只是简单包装
public V put(K key, V value) {
    return putVal(key, value, false);             
}

/** Implementation for put and putIfAbsent */                                     
final V putVal(K key, V value, boolean onlyIfAbsent) { 
    //ConcurrentHashMap不允许key-value为null,如果违反会NPE
    if (key == null || value == null) throw new NullPointerException(); 
    //计算key的hash值,使得hash值更分布
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; 
        //判断hash表是否初始化了
        if (tab == null || (n = tab.length) == 0) 
            //没有初始化那就创建hash表,initTable方法使用CAS初始化hash表
            tab = initTable(); 
        //i=(n-1)&hash 计算出桶的位置,如果在这个桶中还没有元素就直接把新元素放入桶中
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 
            //添加元素使用了CAS,比synchronized性能高
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }  
        //如果这个条件成立,说明当前hash表在扩容过程中,判断的依据是看桶的头结点是否是ForwardingNode
        else if ((fh = f.hash) == MOVED) 
            //其他线程过来帮忙扩容
            tab = helpTransfer(tab, f);
        else {
            //如果这个条件成立,说明当前hash表存在、(n-1)&hash 位子有元素了并且没有在扩容
            V oldVal = null;
            //对(n-1)&hash位置链表的头结点加锁
            synchronized (f) {     
                //再次确认链表的头结点有没有被移动
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { 
                        //槽位中的元素还是链表
                        binCount = 1;    //使用binCount统计当前桶中的元素个数     
                        for (Node<K,V> e = f;; ++binCount) {                      
                            K ek;  
                            //遍历链表,发现相同的key
                            if (e.hash == hash &&                                 
                                ((ek = e.key) == key ||                           
                                 (ek != null && key.equals(ek)))) {               
                                oldVal = e.val;   
                                //更新旧值
                                if (!onlyIfAbsent)                                
                                    e.val = value;
                                //更新完毕之后直接退出
                                break;                                            
                            }                                                     
                            Node<K,V> pred = e;                                   
                            if ((e = e.next) == null) { 
                                //这里就是新元素插入的逻辑了,可以看到使用了尾插法
                                pred.next = new Node<K,V>(hash, key,              
                                                          value, null);           
                                break;                                            
                            }                                                     
                        }                                                         
                    }                                                             
                    else if (f instanceof TreeBin) { 
                        //槽位中的元素已经构成了红黑树了,那就把元素加入到红黑树中
                        Node<K,V> p;                                              
                        binCount = 2;                                             
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,          
                                                       value)) != null) {         
                            oldVal = p.val;                                       
                            if (!onlyIfAbsent)                                    
                                p.val = value;                                    
                        }                                                         
                    }                                                             
                }                                                                 
            }                                                                     
            if (binCount != 0) {  
                //如果桶中元素个数大于等于树化阈值(8)就将链表转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD) 
                    //转化为红黑树的逻辑
                    treeifyBin(tab, i);                                           
                if (oldVal != null)                                               
                    return oldVal;                                                
                break;                                                            
            }                                                                     
        }                                                                         
    }                                                                             
    addCount(1L, binCount);                                                       
    return null;                                                                  
}                                                                                 

初始化哈希表—initTable()

在put过程中如果是第一次put,此时hash表还没有初始化,因此需要首先初始化哈希表。初始化哈希表会调用initTable()方法,这个方法会使用CAS加锁,然后实例化一个hash表

我们一起来看看hash表的初始化逻辑吧!

 private final Node<K,V>[] initTable() {                               
     Node<K,V>[] tab; int sc;                                          
     while ((tab = table) == null || tab.length == 0) { 
         //在构造方法中sizeCtl就是hash表的初始大小
         if ((sc = sizeCtl) < 0)                                       
             Thread.yield(); // lost initialization race; just spin 
         //使用CAS将sc的值设置为-1 表示初始化table,设置成功后返回true
         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
             //获取到锁,创建table.
             //此时其他线程会在while()循环中忙等待(即自旋)
             try {    
                 //再次检查hash表没有创建
                 if ((tab = table) == null || tab.length == 0) {       
                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;         
                     @SuppressWarnings("unchecked")  
                     //创建hash表
                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];   
                     table = tab = nt; 
                     //计算下次hash表扩容的阈值
                     sc = n - (n >>> 2);                               
                 }                                                     
             } finally { 
                 //之后sizeCtl就表示hash表的扩容阈值了
                 sizeCtl = sc;                                         
             }                                                         
             break;                                                    
         }                                                             
     }                                                                 
     return tab;                                                       
 }                                                                     

统计链表的长度—addCount()

在put操作的最后一个操作中他会调用addCount方法来统计哈希表的使用情况,如果已经超过阈值了就会扩容。具体请看注释分析。

//check就是链表中结点个数
private final void addCount(long x, int check) {                           
    CounterCell[] as;   //累加单元数组,这一点可以参考LongAdder类
    long b, s;  
    //初始化累加数组以及累加单元的操作,具体参考LongAdder类的设计思想
    if ((as = counterCells) != null ||                                     
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; 
        long v; 
        int m;                                      
        boolean uncontended = true;                                        
        if (as == null || (m = as.length - 1) < 0 ||                       
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||          
            !(uncontended =                                                
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {   
            fullAddCount(x, uncontended);                                  
            return;                                                        
        }                                                                  
        if (check <= 1)                                                    
            return;
        //获取哈希表中元素的个数
        s = sumCount();                                                    
    }                                                                      
    if (check >= 0) {                                                      
        Node<K,V>[] tab, nt; int n, sc;  
        //sizeCtl表示扩容阈值,s是当前哈希表中元素的个数如果超过阈值就会触发扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&       
               (n = tab.length) < MAXIMUM_CAPACITY) { 
            int rs = resizeStamp(n); 
            //其他线程进来之后发现sizeCtl<0(因为第一个线程把这个值设置为负数了)
            if (sc < 0) {                                                  
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||   
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 
                    transferIndex <= 0)                                    
                    break;    
                //如果新的hash表已经被创建了,就使用CAS帮助第一个线程完成扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))        
                    transfer(tab, nt);                                     
            }
            //正常情况下,第一个线程执行到这以后首先使用CAS把sizeCtl的值修改为一个负数,修改成功后就进入到transfer方法中扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,                
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) 
                //第一个线程进来后初始化一个新的hash表
                transfer(tab, null);                                       
            s = sumCount();                                                
        }                                                                  
    }
}  


//注意累加数组的类型定义CounterCell上使用了@sun.misc.Contended注解,
//这个注解可以避免缓存行伪共享问题
@sun.misc.Contended 
static final class CounterCell {
    //保存了每一个桶中元素的个数
    volatile long value;
    CounterCell(long x) { value = x; }              
}

4、获取元素操作—get()

get()方法没有加锁,即可以并发的读,敢这么做重要是因为Node数组被volatile被修饰了,volatile保证了共享变量在多线程下的内存可见性,即其他线程只要一修改Node数组中的数据,get操作的线程马上就可以知道修改的数据。

get()方法逻辑梳理:

(1)计算出key的hash值;

(2)根据 hash 值找到哈希槽的位置:(n - 1) & hash

(3)根据该位置处头节点的性质进行查找:

1)如果该位置为 null,那么直接返回 null 就可以了

2)如果该位置处的节点刚好就是我们需要的,返回该节点的值即可

3)如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面调用 find 接口,程序会根据上下文执行具体的方法。

4)如果以上 3 条都不满足,那就是链表,进行遍历比对即可

5、获取元素的个数—size()

public int size() {
    //直接调用sumCount()统计哈希表元素个数
    long n = sumCount();                                       
    return ((n < 0L) ? 0 :                                     
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);                                           
}    

final long sumCount() {                            
    CounterCell[] as = counterCells; 
    CounterCell a;
    long sum = baseCount;
    //counterCells中保存了每一个桶中元素的个数,现在统计只需要遍历counterCells把每个桶中元素个数相加即可
    if (as != null) {                              
        for (int i = 0; i < as.length; ++i) {      
            if ((a = as[i]) != null)               
                sum += a.value;                    
        }                                          
    }                                              
    return sum;                                    

6、扩容操作—transfer()

扩容的逻辑比较复杂,这里不再贴出源码了,我们来捋一下关键流程就好了:

  • 首先扩容会传进来两个参数:旧哈希表和新哈希表(Node<K,V>[] tab, Node<K,V>[] nextTab),在第一个线程调用这个方法的时候新哈希表nextTab=null

  • 当线程判断nextTab=null之后,会new一个是旧hash表2倍大小的新哈希表Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];

  • 创建新哈希表完成之后就是元素结点的搬迁工作了。这个过程中值的注意的是,当一个线程把旧哈希表上桶中的元素搬迁到新的哈希表上之后或者这个桶中没有元素,它用一个ForwardingNode类型的结点挂在这个桶下,以此告诉其他线程这个桶位置已经被处理了,这样可以避免其他线程get或put时出现错误

  • 还需要注意的是,在移动桶中元素的过程中,还是需要对链表和红黑树分别处理,前者的判断依据是头结点的hash值大于等于0,后者的判断依据是头结点的hash值小于0并且头结点是TreeBin类的实例。

参考资料

留言区

还能输入500个字符