高并发编程之AQS(AbstractQueuedSynchornizer)—源码剖析

AQS概述

AQS是 AbstractQueuedSynchorinizer 的简写,中文名是队列同步器。AQS是Java实现阻塞式锁和同步工具的基石。它使用一个int类型的成员变量state表示同步状态,通过内置的一个FIFO队列完成线程的排队工作。AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。

一、AQS的实现分析

1、使用一个共享变量state实现互斥

AQS使用一个int成员变量state来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。

状态信息通过getState(),setState(),compareAndSetState()进行操作。AQS支持两种类型的同步方式:

  • 独占式:同一时刻只允许一个线程独占式的访问资源;
  • 共享式:同一时刻允许多个共享式的线程访问资源;

共享式访问的资源时,其他共享式的访问均被允许,而独占式访问被阻塞;独占式访问资源时,同一时刻的其他访问均被阻塞。

这两种模式在AQS分别使用两个变量表示:

这样设计方便使用者实现不同类型的同步组件,独占式如ReentrantLock,共享式如Semaphore,CountDownLatch,组合式的如ReentrantReadWriteLock。总之,AQS为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

同步器的设计是基于模板方法模式的,一般的使用方式是这样:

  • (1)使用者继承AbstractQueuedSynchronizer并实现自定的方法。
  • (2)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。这其实是模板方法模式的一个很经典的应用。继承同步器后必须重写如下方法:
protected boolean tryAcquire(int arg)       //独占式的获取同步状态
protected boolean tryRelease(int arg)       //独占式释放同步状态
protected int tryAcquireShared(int arg)   //共享式的获取同步状态,返回值阿玉等于0表示获取成功
protected boolean tryReleaseShared(int arg)   //共享式的释放同步状态
protected boolean isHeldExclusively()        //当前同步器是否在独占模式下被线程使用
2、同步队列的基本数据结构

上面说到,AQS在其内部维护了一个队列,可是我还要告诉你AQS 内部的等待队列实质是一个双向链表 ,链表中的每一个节点的构造如下(抛开其他信息,先了解一下他的底层数据结构):

public abstract class AbstractQueuedSynchronizer{
    private volatile Node head;    //等待队列的头结点
    private volatile Node tail;    //等待队列的尾结点
    private volatile int state;    //锁状态
    
    //同步器中的结点是用来保证获取同步状态失败的线程引用、
    //等待条件以及前驱和后继结点、结点的锁类型以及名称的
    static class Node {
        volatile Node perv;    //前驱结点
        volatile Node next;    //后继结点
        /***********和数据结构无关的字段****************/
        volatile Thread thread;   //获取同步的线程
        Node nextWaiter;   //请求的是共享锁还是独占锁
        volatile int waitStatus;  //等待状态
    }
}

同步器依赖内部的同步队列来完成同步状态的管理,当线程获取同步状态失败的时,同步器会将当前线程以及等待信息等构成一个节点(Node)并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点的线程唤醒,使其尝试获取同步状态。

其中每一个Node结点在Node这个类中还定义了4种等待状态(waitStatus值):

  • CANCELLED(1):表示当前线程已经被取消(等待超时或者被中断了)
  • SIGNAL(-1):表示后继线程需要被唤醒
  • CONDITION(-2):表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中
  • PROPAGATE(-3):表示下一次共享式同步状态会被无条件地传播下去
  • 0:当一个Node被初始化时waitStatus的默认值
3、向同步队列加入结点实现线程排队

结点加入到同步队列中以后进入一个自旋的过程,每个结点都在观察,当条件满足后,如果获取到了同步状态(锁)就可以从自旋状态病虫同步队列中移除,否则依旧留在同步队列自旋。同步队列是遵循FIFO的,首节点表示获取同步状态成功的结点,首节点的线程在释放同步状态时,将会唤醒后继结点,而后继结点将会在获取同步状态成功将会在此时将自己设置为头结点。

设置首节点是通过获取到同步状态的线程来完成的,有只有一个线程能够获取到同步状态,一次设置头结点的方法不需要使用CAS,他只需要将首节点设置成为原结点的后继结点即可(head->next=this.next;)。

4、阻塞和唤醒线程。

当线程加入到同步队列之后,通过某种方法实现对线程的阻塞和唤醒。在AQS中使用的是LockSupport。LockSupport.park()可以阻塞当前线程,LockSupport.unpark()可以指定唤醒某个线程。LockSupport底层基于Unsafe类,这个类的提供了一些绕开JVM的更底层功能,功能十分强大。

二、AQS源码分析

1、独占式同步锁的获取与释放流程分析

通过调用同步器的 acquire(int arg) 方法可以获取同步状态,调用了该方法后的线程不会对中断操作响应。方法的代码如下:

方法的主要完成了同步状态获取、结点构造、加入同步队列以及在同步队列中自旋等操作。主要逻辑是:

  • 首先调用tryAcquire(int arg)方法,该方法的作用是独占式获取同步状态,底层依赖CAS实现,如果获取成功返回true,否则返回false;需要注意的是,这个方法在AQS内部并没有真正的逻辑,他把这部分交给子类实现。
  • 如果获取同步失败,则构造同步结点(独占式的,Node.EXCLUSIVE同一时刻只能有一个线程获取到同步锁)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部;
  • 最后调用acquireQueued(Node node,int arg)方法使得该线程自旋。

进一步分析结点是如何构造以及加入到同步队列中的,下面是 addWaiter() 方法的源码:

  • 首先把当前线程以及独占式参数为参数构造一个新节点node;
  • 获取当前尾结点,如果尾结点不为空就把node的前驱结点设置为当前尾结点,之后在使用CAS把当前尾结点的后继结点这是为node,成功之后返回node;
  • 如果当前尾结点为空,表示当前同步队列还没有初始化,此时会调用enq(Node node)方法进行初始化同步队列。

如果pred为null(说明同步队列还没与初始化),或者pred在极端情况下被背的线程修改了。其实就需要看一下enq(Node node)方法的实现了。 enq(Node node) 方法的源码如下:

我们可以看到,在enq(Node node)方法中就是一个死循环配合CAS在不断的重试(自旋),直到CAS设置成功了才会返回,方法的主要逻辑如下:

  • 获取当前同步队列的尾结点,判断如果是null(说明还没有初始化同步队列),就执行初始化,通过CAS为同步队列设置头结点(第一个结点),此过程循环配合CAS直到执行成功为止;请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。
  • 同步器初始化成功在配合CAS以及循环设置队尾,也是直到成功为止。在不断重试的过程中,只有通过CAS把结点设置成为了尾结点之后,当前线程才能从方法返回,否则就不断的重试。通过不断的重试,以及CAS机制,将并发的添加节点的操作串行化,从而保证了线程安全。

结点加入到同步队列后,线程就会进入到自旋的过程,每个线程都在观察,当条件满足,获取到了同步状态,就可以从自旋过程退出,否则依旧自旋。下面是 acquireQueued(Node node,int arg) 方法的源码:

在acquireQueued()方法中,当前线程不断的尝试获取同步状态,从代码中我们也看到了。只有当前驱是head的结点才可以尝试获取锁,这么做的理由有以下两个:

  • (1)这样做可以维护队列的FIFO特性
  • (2)头结点是成功获取到同步状态的结点,二头结点的线程释放了同步状态后,将会唤醒其后继结点,后继结点的线程被唤醒或需要检查自己的前驱结点是否是头结点

而acquireQueued()方法的逻辑主要如下:

  • 首先获取当前节点的前驱结点,如果前驱节点是head,那么当前结点可以尝试获取锁;
  • 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点;
  • 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
  • 如果在执行过程中抛出了异常则取消锁的获取并把当前结点出队

独占锁的获取流程( acquire(int arg) 方法)的执行流程可以用下图说明:

当一个线程获得同步状态并执行完相应的逻辑之后,就需要释放同步同步状态,是的后续接待也可以获得同步状态。通过调用同步器的relaese(int arg)方法可以释放同步锁状态,该方法在释放了昂前线程的同步状态之后会唤醒其后继结点。下面是该方法的源码:

方法会首先调用 tryRelaese(int args) 释放同步状态,这个方法在AQS没有实现,需要子类实现具体的逻辑。tryRelaese(int args)返回true之后调用 unparkSuccessor(h) 方法,在这个方法内会使用LockSupport.unpark()方法唤醒当前结点的后继结点。关于LockSupport的原理可以参考我的博客LockSupport深入源码剖析

分析到这,适当的做个总结

在获取同步锁状态时,同步器维护了一个同步队列,获取同步状态失败的线程都会被包装成一个Node结点加入到同步器的尾部,在添加的时候是以CAS的方式设置尾结点的,并且添加进队列中的结点自旋;移除同步队列的条件是,当前结点的前驱结点是head结点并且成功获取到同步状态。在释放同步锁的时,同步器调用tryRelease(int arg) 方法释放同步状态,并且调用 LockSupport.unpark(Thread t) 方法唤醒头结点的后续结点。

2、共享式同步状态的获取与释放流程分析

通过调用同步器的**acquireSahared(int arg)**方法可以共享式的获取同步状态,源码如下:

在acquireSahared(int arg)方法中,同步器调用tryAcquire(int arg)方法尝试获取同步状态,这个方法同样地AQS没有没有具体实现,需要子类来实现。这个tryAcquire(int arg)方法返回一个int类型的值,当返回值大于等于0时,表示获取到了同步状态;否则就会调用doAcquireShared(int arg)以自旋的方式获取同步状态。doAcquire(int arg)方法源码如下:

在方法doAcquireShared(int arg)自旋的过程中,如果当前结点的前驱结点为head时,还是会调用tryAcquire(int arg)尝试获取同步状态,如果返回值大于等于0,表示获取同步状态成功,于是就可以退出自旋了。同样地,如果在自选的过程中发生了异常就会退出自旋,并且把当前结点从同步器中移除。

当共享式的同步状态的线程需要释放同步状态时同步器可以调用releaseSgared(int arg)方法释放同步状态。

在方法内部调用了tryReleaseShared(int arg)这个方法也是需要子类去实现的,当到方法返回true时表明释放同步状态成功,如果首次调用释放同步状态失败就会调用doReleaseShard(int arg)方法以自旋的方式不断重试 释放同步状态。同时由于这是共享式的,为了保证多个线程同时访问的并发性,方法内部采用了CAS。最终方法会唤醒头头结点的下一个结点,使用方法任然是 unparkSuccessor(h) ,这个方法在上面分析过,他其实是调用了LockSupport.unpark()方法。

留言区

还能输入500个字符