高并发编程之高性能原子类LongAdder—源码剖析

高性能原子类是java8中增加的原子类,它们使用分段的思想(Cell[]),把不同的线程hash到不同的段上去更新,最后再把这些段的值相加得到最终的值,相对Atomic类这些类运行性能更高,这些类主要有:

(1)Striped64:下面四个类的父类。

(2)LongAccumulator:long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加减乘除模。

(3)LongAdder:long类型的累加器,LongAccumulator的特例,只能用来计算加法,且从0开始计算。

(4)DoubleAccumulator:double类型的聚合器,需要传入一个double类型的二元操作,可以用来计算各种聚合操作,包括加减乘除模。

(5)DoubleAdder:double类型的累加器,DoubleAccumulator的特例,只能用来计算加法,且从0开始计算。

这里我以LongAdder为例来探究一下由大神Doug Lea操刀编写的作品之精妙!!!

一、LongAdder中几个关键的属性

LongAdder继承自Striped64抽象类,Striped64中定义了Cell内部类和各重要属性。

LongAdder的原理是,在最初没有竞争的时候使用CAS更新base值就可以了,当有多线程有过竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。

注意!

最初没有竞争是指一开始没有没有线程之间的竞争,但也有可能是多线程在操作,只是多个线程没有同时更新base的值。

有过竞争是指只要出现过竞争不管后面有没有竞争都使用cells更新值,规则是不同的线程hash到不同的cell上去更新,减少竞争。

二、缓存行伪共享与解决方法

1、CPU缓存架构

CPU 是计算机的心脏,所有运算和程序最终都要由它来执行。主内存(RAM)是运行时数据存放的地方,CPU 和主内存之间有好几级缓存(一般是3级缓存L1、L2、L3),因为即使直接访问主内存也是非常慢的,下面是一个CPU访问缓存和访问内存的速度比较:

从CPU到大约需要的时总周期
寄存器1
L1(一级缓存)3~4
L2(二级缓存)10~20
L3(三级缓存)40~45
内存120~240

由于内存和CPU的速度差距还是比较大的,为了提高效率,因此需要在CPU和内存之间加缓存预读取数据来提升效率。

2、缓存行和伪共享问题

​ 缓存是由缓存行组成的,通常是 64 字节(常用处理器的缓存行是 64 字节的,比较旧的处理器缓存行是 32 字节),并且它有效地引用主内存中的一块地址。

一个 Java 的 long 类型是 8 字节,因此在一个缓存行中可以存 8 个 long 类型的变量。

当缓存行中的数据被一个CPU修改后,为了保证缓存一致性,其他CPU相同缓存行中的数据就都必须失效。

表面上看这套机制似乎完美,但是请看这么一个场景:主内存中有两个相邻的值a、b都被加载进了不同核心的缓存中,不出意外的话这两个变量在同一个缓存行里,带来的问题就是比如当核心1对a的值做了修改,那么为了保证缓存一致性,其他核心中缓存了a的缓存行都必须失效,问题就在于失效 a 的缓存的同时,也会把 b 失效,反之亦然,虽然保证了缓存的一致性,但是却带来了效率上的降低。

这个问题在LongAdder中是存在的

在LongAdder中Cell是数组的形式,数组我们都知道空间是连续分配的,一个Cell对象的内存占用在64位系统中是24字节(无论有没有开启指针压缩)。因此一个缓存行可以同时最多放下两个Cell对象,这样在同一个缓存行中 的两个Cell对象就会出现一个修改后导致另一个失效的问题。

在jdk1.8中使用@sun.misc.Contended注解解决了这个问题,它的原理是在使用此注解的对象或字段前后加上128字节的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突,从而将这个不同的对象预存到不同的缓存行中。

下面是在Striped64中内部类Cell,在他头上就是使用注解@sun.misc.Contended来防止防止缓存行伪共享的。

三、核心方法源码解析

LongAdder是一个专门做累加的高性能原子类,他只能做加减运算并且初始值是0,这一点从他的构造方法可以看出。下面是LongAdder做类似i++操作的示例:

public class LongAdderTest {

    public static void main(String[] args) {
        LongAdder adder = new LongAdder();   //默认是0
        adder.increment();
    }

}

从increment()方法开始我们分析一下具体的执行原理。increment()是对add(long)方法的再次封装,核心逻辑还得看add(long)方法。

1、add(long)源码解析

add(long x)方法是LongAdder的主要方法,使用它可以使LongAdder中存储的值增加x,x可为正可为负。

/**                                                                                   
 * Adds the given value.        
 *  
 * @param x the value to add    
 */     
public void add(long x) {                                                                   
    /**
     * as:父类Striped64中的累加单元数组    
     * b:当前base值
     * v:当前线程hash到Cell中存储的值   
     * m:cells的长度-1,在hash的时候做掩码使用   
     * a:当前线程hash到的Cell      
     */  
    Cell[] as; long b, v; int m; Cell a;   
    /**                                                                               
     *条件 1:累加数组cells为空说明之前没有出现过竞争,此时就使用CAS更新base的值,底层调用了Unsafe的compareAndSwapLong()方法       
     *条件 2:累加数组cells为空,说明之前出现过竞争,此时每个线程更新各自的在cells中的Cell 
     */                                                                                     
    if ((as = cells) != null || !casBase(b = base, b + x)) {             
        //true 表示当前竞争还不是很激烈                                                   
        //false 表示当前竞争很激烈,多个线程hash到同一个Cell,可能要扩容               
        boolean uncontended = true; 
        /** 
         * 条件 1:累加数组尾空,如果此条件成立,说明正在出现竞争(是从上面条件2CAS更新失败后过来的)                                 
         * 条件 2:此条件一般不会成立,因为他如果可以成立说明cells数组的长度为0,那as==null也就会成立                            
         * 条件 3:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell                                   
         * 条件 4:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应扩容                             
         */                                                                           
        if (as == null ||                                                             
            (m = as.length - 1) < 0 ||                                               
            /**                                                                       
             * getProbe()方法返回的是线程中的threadLocalRandomProbe字段                   
             * 它是随机生成的一个值,对于一个确定的线程这个值是固定的                           
             * 除非刻意修改它                                                           
             */                                                                       
            (a = as[getProbe() & m]) == null ||                                       
            !(uncontended = a.cas(v = a.value, v + x)))                               
            longAccumulate(x, null, uncontended);                                     
    }                                                                                 
}                                                                                                                                                     

add()方法的执行流程图:

2、 longAccumulate()源码解析

add()方法中有三种情况会调用longAccumulate()方法,分别是当CAS更新base失败时、当前线程在cells中没有Cell时、当前线程CAS 更新cell失败需要扩容时。因此对于longAccumulate()方法的解读我也分为三个部分,下面是方法的总览:

  • 累加数组cells还未初始化时
//没有其他线程对cell加锁(cellsBusy=1表示加锁了,cellsBusy=0表示未加锁)&cells不存在
//前两步都成功后就会使用CAS将cellsBusy标志修改为1,表示当前线程对cells加锁了
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
      //初始化一个cells数组
       boolean init = false;
       try {                           // Initialize table
          if (cells == as) {
           //创建的cells数组默认是大小是2
            Cell[] rs = new Cell[2];
            //紧跟着就创建一个Cell,这里写的很精妙,通过&操作实现了懒加载
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true;  //初始化成功标志
           }
        } finally {
           //最后把锁标志释放
           cellsBusy = 0;
        }
        if (init) //如果初始化cells成功就返回
            break;
}

执行流程图

  • 累加数组cells存在但是当前线程的累加槽位为空以及累加数组存在并且当前线程的累加槽位存在的情况
if ((as = cells) != null && (n = as.length) > 0) {              
    //1.累加数组cells存在但是当前线程的累加槽位为空
    if ((a = as[(n - 1) & h]) == null) {                        
        if (cellsBusy == 0) {       // Try to attach new Cell   
            //给当前线程创建一个Cell                                     
            Cell r = new Cell(x);   // Optimistically create    
            //检查cells的加锁状况并使用CAS对cells加锁                        
            if (cellsBusy == 0 && casCellsBusy()) {             
                //是否成功创建标志                                      
                boolean created = false;                        
                try {               // Recheck under lock       
                    Cell[] rs; int m, j;                        
                    //上锁之后再次检查累加数组是否为空&&当前线程在cells中的槽位为空        
                    if ((rs = cells) != null &&                 
                            (m = rs.length) > 0 &&              
                            rs[j = (m - 1) & h] == null) {      
                        //把新创建的Cell赋给cells[j]                   
                        rs[j] = r;                              
                        created = true;                         
                    }                                           
                } finally {                                     
                    //解锁                                        
                    cellsBusy = 0;                              
                }                                               
                if (created)                                    
                    //创建槽位成功返回                                  
                    break;                                      
                //创建槽位失败重试                                      
                continue;           // Slot is now non-empty    
            }                                                   
        }                                                       
        collide = false;                                        
    }                                                           
    else if (!wasUncontended)       // CAS already known to fail
        wasUncontended = true;      // Continue after rehash  
    //2.累加数组存在并且当前线程的累加槽位存在
    //首先,再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回
    else if (a.cas(v = a.value, ((fn == null) ? v + x :         
            fn.applyAsLong(v, x))))                             
        break; 
    //当前累加数组cells的大小是否大于CPU数
    else if (n >= NCPU || cells != as)                          
        collide = false;            // At max size or stale     
    else if (!collide)                                          
        collide = true;                                         
    else if (cellsBusy == 0 && casCellsBusy()) {
        //cells扩容逻辑
        try {                                                   
            if (cells == as) {      // Expand table unless stale
                //扩容cells累加数组,扩容后的大小是原cells的2倍
                Cell[] rs = new Cell[n << 1]
                 //把原cells中的值复制到新cells中   
                for (int i = 0; i < n; ++i)                     
                    rs[i] = as[i];                              
                cells = rs;                                     
            }                                                   
        } finally {  
            //解锁
            cellsBusy = 0;                                      
        }                                                       
        collide = false;   
        //数组扩容后重试
        continue;                   // Retry with expanded table
    }  
    //更新失败或者达到了CPU核心数,重新生成probe,改变线程对应的Cell,并重试
    h = advanceProbe(h);                                        
}                                                               

第一种情况:累加数组cells存在但是当前线程的累加槽位为空的执行流程图

第二种情况:累加数组cells存在并且当前线程的累加槽位存在时对流程图

3、sum()源码解析

sum()方法是获取LongAdder中真正存储的值的大小得到方法,通过把base和cells中所有槽位的值相加得到。

 /**                                                                 
  * Returns the current sum.  The returned value is <em>NOT</em> an  
  * atomic snapshot; invocation in the absence of concurrent         
  * updates returns an accurate result, but concurrent updates that  
  * occur while the sum is being calculated might not be             
  * incorporated.                                                    
  *                                                                  
  * @return the sum                                                  
  */                                                                 
 public long sum() {
     //拿到累加数组cells
     Cell[] as = cells; Cell a;
     //拿到base值
     long sum = base;                                                
     if (as != null) { 
         //如果cells存在那就遍历cells把各个槽位中的值相加得到最终的和
         for (int i = 0; i < as.length; ++i) {                       
             if ((a = as[i]) != null)                                
                 sum += a.value;                                     
         }                                                           
     }                                                               
     return sum;                                                     
 }                                                                     

可以看到sum()方法是把base和所有槽位的值相加得到,那么,这里有一个问题,如果前面已经累加到sum上的Cell的value有修改,不是就没法计算到了么?

答案确实如此,所以LongAdder可以说不是强一致性的,它是最终一致性的

总结

  • (1)LongAdder通过long类型的base和Cell数组来存储值
  • (2)不同的线程会hash到不同的cells累加数组的槽位上更新,减少了竞争
  • (3)LongAdder的性能非常高,最终会达到一种无竞争的状态;
  • (4)LongAdder消除缓存行伪共享是通过sun.misc.Contended注解实现的,它的原理就是在使用了这个注解对对象或字段前后都会加上128字节的padding(空白,不会对原数据产生影响),使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。
  • (5)cells数组的最大是等于CPU核心数或是小于CPU核心数的最大2的幂,即cells数组的大小不可能大于CPU核心数。

留言区

还能输入500个字符