高并发编程之ConcurrentLinkedQueue和ConcurrentLinkedDeque—源码剖析

一、ConcurrentLinkedQueue简介

在Java开发个过程中我们不可避免的会使用到一些集合类,比如ArrayList、HashMap等,这些集合类型在多线程环境下使用会有线程安全问题。虽然不同的集合类型都有他的线程安全版本,比如ArrayList不是线程安全的,Vector是线程安全。而保障Vector线程安全的方式,是非常粗暴的在方法上用synchronized独占锁,将多线程执行变成串行化。要想将ArrayList变成线程安全的也可以使用Collections.synchronizedList(List list)方法ArrayList转换成线程安全的,但这种转换方式依然是通过synchronized修饰方法实现的,很显然这不是一种高效的方式,同时,队列也是我们常用的一种数据结构,为了解决线程安全的问题,Doug Lea大师为我们准备了ConcurrentLinkedQueue这个线程安全的队列。从类名就可以看的出来实现队列的数据结构是链式。

ConrrentLinkedQueue是一个单向链式结构的无界并发队列。它遵循队列的先进先出规则(FIFO),当我们添加一个元素的时候会添加到队列的尾部,当我们获取一个队列的时候会从队列头部获取。它采用CAS机制保证了对元素操作的线程安全性。

ConcurrentLinkedQueue适用于"单生产,多消费"的场景,同一时刻,允许两个线程(一个消费者一个或多个生产者)同时执行。

二、ConcurrentLinkedQueue的结构

1、ConcurrentLinkedQueue的类图

首先对ConcurrentLinkedQueue在类的继承关系上先有个整体认知:

ConcurrentLinkedQueued由head结点和tail结点组成,每个节点(Node)由结点元素(item)和指向下一个结点(next)的引用组成,节点与节点之间通过next引用关联起来,从而组成了一张链表结构的队列。默认情况下head结点存储的元素内容为空,tail结点等于head节点()

//头节点
private transient volatile Node<E> head;
//尾结点
private transient volatile Node<E> tail;
//构造方法初始化的时候head=tail,并且节点的内容是空的
public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
}
2、内部类Node

Node中有两个属性:(1)元素item,类型有泛型参数决定;(2)下一个节点的引用next。值的注意的是,他们都被volatile修饰了,这可以保证在多线程环境下的内存可见性。另外在Node的构造参数中我们可以看到,它调用了Umsafe类中的本地方法,这进一步保证了线程安全性。

private static class Node<E> {                                    
    volatile E item;                                              
    volatile Node<E> next;                                        
                                                                  
    /**                                                           
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.                
     */                                                           
    Node(E item) {                                                
        UNSAFE.putObject(this, itemOffset, item);                 
    }    
    ......
}
3、操作Node的几个CAS操作

在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。可以看出在处理器指令集能够支持CMPXCHG指令后,在java源码中涉及到并发处理都会使用CAS操作,那么在ConcurrentLinkedQueue对Node的CAS操作有这样几个:

//使用CAS更新节点中item域 
boolean casItem(E cmp, E val) {                                    
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}  

//懒惰设置next指向  
void lazySetNext(Node<E> val) {                                    
    UNSAFE.putOrderedObject(this, nextOffset, val);                
}                                                                  
   
//使用CAS更新节点的next指向    
boolean casNext(Node<E> cmp, Node<E> val) {                        
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}                                                                  

上面这些方法,底层实际调用了Unsafe类的相关本地方法来实现的,具体的方法实现需要惨嚎hostpot源码了,所以就先看到这里了,目前为止我们需要知道:ConcurrentLinkedQueue是通过CAS+volatile来保证ConcurrentLinkedQueue的线程安全性的即可。

三、入队操作—offer()

入队操作就是将节点添加到队列的尾部的过程。让我们通过每个节点入队的快照来观察下tail节点的变化:

入队主要就干两件事:

  • (1)新建一个节点,并将新节点设置为当前队列尾部节点的下一个节点
  • (2)更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next节点为空,则将入队的结点设置为tail节点的next节点

不过我们需要注意的是:ConcurrentLinkedQueue当前的tail不一定指向队列真正的尾节点,因为在ConcurrentLinkedQueue中tail是被延迟更新的

public boolean offer(E e) { 
    checkNotNull(e);  
    //入队前,先创建一个入队节点
    final Node<E> newNode = new Node<E>(e);                       
     
    //死循环不断重试
    for (Node<E> t = tail, p = t;;) { 
        //创建一个指向tail的节点引用t和p
        Node<E> q = p.next;
        //如果当前tail就是正真的尾结点,那就使用CAS尝试把新节点设置为tail的next并且把用CAS把tail更新
        if (q == null) {                                          
            // CAS更新next指向                                  
            if (p.casNext(null, newNode)) {                             
                if (p != t) // hop two nodes at a time
                    //CAS更新tail,允许失败
                    casTail(t, newNode);  // Failure is OK.       
                return true;                                      
            }                                                          
        }                                                         
        else if (p == q)
            //寻找正真的tail
            p = (t != (t = tail)) ? t : head;                     
        else                                                      
             //寻找正真的tail            
            p = (p != t && t != (t = tail)) ? t : q;              
    }                                                             
}                                                                 

从源码角度来看,整个入队过程主要做两件事:第一是定位出尾结点;第二是使用CAS算法将入队节点设置为尾结点的后继结点,如果不成功则重试。

四、出队操作—poll()

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化:

从图中可知,并不是每次出队的时候都会更新head节点,当head结点里有元素(item!=null)的时候,直接弹出head结点中的元素,而不会更新head节点。只有当head节点里没有元素的时候,出队操作才会更新head节点。这种做法可以减少使用CAS更新head节点带来的消耗,从而提高效率。

public E poll() {
    restartFromHead:
    for (;;) {
        //p节点就是首节点,也即需要出队的结点
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //如果节点的的元素不为null,则使用CAS尝试把结点的内容更新为null,表示出队  
            if (item != null && p.casItem(item, null)) {
                //元素出队成功,更新head节点
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
            // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // p == q,则使用新的head重新开始
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

头节点的元素,首先判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

五、HOPS设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail节点的更新触发时机:当tail结点的next节点不为空的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。
  • head节点的更新触发时机:当head结点的item=null的时候,将会执行定位头结点的操作,定位到头结点头使用CAS完成对头节点的删除,并且通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。

对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

六 、ConcurrentLinkedDeque

在J.U.C包下还有一个非阻塞队列的实现就是ConcurrentLinkedDeque,它和ConcurrentLinkedQueue保证线程安全的机制是一样的,都采用CAS+volatile来实现。但是他与ConcurrentLinkedQueue的区别如下:

  • ConcurrentLinkedQueue 单向链表结构的无界并发队列。元素操作按照 FIFO (first-in-first-out 先入先出) 的顺序。适合“单生产,多消费”的场景。

  • ConcurrentLinkedDeque 双向链表结构的无界并发队列。与 ConcurrentLinkedQueue 的区别是该队列同时支持FIFOFILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除)。适合“多生产,多消费”的场景。

ConcurrentLinkedDeque的Node中不仅有next引用指向下一个节点,还有一个prev引用指向前一个节点(和AQS的结构类似)。明白这一点,在去理解ConcurrentLinkedDeque的设计就不困难了。

留言区

还能输入500个字符