高并发编程之ReentrantLock—源码剖析

一、从Lock接口说起

锁时用来控制多个线程范文共享资源,保证资源安全性的一种重要手段。在Lock接口出现以前,Java陈旭是靠synchorinzed关键字实现锁功能点的,在jdk1.5之前使用synchroinzed绝对是一个重量级的操作,因为synchroinzed是依赖对象锁实现的,而对象锁(ObjectMonitor)又依赖底层操作系统的mutex lock指令,这个指令需要操作系统在用户态和内核态不断的切换,这是一个很消耗CPU资源的操作。好的一点是Java在jdk 1.5提供了J.U.C并发包,其中新增的Lock接口具有和synchorinzed同样的锁功能,只是在使用的时候需要我们显示的获取和释放锁。 就像下面这样:

Lock lock=new ReentrantLock();
lock.lock();
try{
    //TODO
}finally{
    lock.unlock();
}
Lock接口的API

Lock是一个接口,它定义了所的获取和释放的基本操作,Lock接口定义方法有如下几个:

void lock()        //获取锁,调用该方法的线程会获取锁
void lockInterruptibly()        //可中断地获取锁,该方法可以在获取锁的过程中响应中断
boolean tryLock()               //尝试分阻塞的获取锁,调用该方法后立即返回,如果获取到锁了返回true,否者返回false
boolean tryLock(long time,TimeUtil util) throws InterrutptionException  //超时获取锁
void unlock()          //释放锁
Condition newCondition()        //获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁才能调用该组件的wait()方法,调用方法后当前线程释放锁       

Lock接口中的这些接口方法在ReentrantLock这个类中有具体的实现,接下来我们来首先了解一下ReentrantLock的基本特性,之后我们在深入源码分析一下这把锁的原理。

二、ReentrantLock基本特点

  • (1)可重入:和synchronized一样是一个可重入锁
  • (2)可以响应中断:ReenTrantLock提供了一种能够中断等待锁的线程的机制。
  • (3)可以通过构造器构造 公平锁 还是 非公平锁 :ReenTrantLock可以指定是公平锁还是非公平锁; synchronized只能是非公平锁。
  • (4)可以在获取锁的时候设置超时时间
  • (5)需要显示的获取和释放锁

三、深入源码分析RentrantLock

在阅读ReentrantLock的源码之前,我建议你最好是知道CAS、volatile、以及AQS这些基本的概念的,不然你会越读越糊涂。如果你确实对这些知识优点含糊不清或者压根不知道,那么你可以百度先了解一下,或者你也可以在我的博客Java多线程与高并发分类中找到相应的文章学习。

好了,我么回到主题。首先我们来看一下ReentranLock的类图:

从类图中可以直观的看到,Reentrantlock继承了上面我们说的Lock接口,并且在其内部有三个内部类:SyncNonfairSyncFairSync。其中Sync继承了AQS,NonfairSyncFairSync继承了Sync,他们分别是非公平锁好公平锁的实现。接下来我们对这三个部分详细分析一下。

1、ReentrantLock的构造方法

ReentrantLock有两个构造方法,默认无参的构造方法是一个非公平锁,但是我们可以使用另一个有参构造方法构建一个公平锁。下面是两个构造方法的源码:

可以看到,默认的无参构造方法他是直接new了一个非公平同步器(即非公平锁),而在有参构造方法中,我们传入true表示构造一个公平锁,反之就是非公平锁。

你问我 sync 变量是啥?这是整个ReentrantLock唯一的一个成员变量,ReentrantLock就是通过聚合了一个同步器的子类完成了对多个线程的访问控制。

Sync是一个抽象类,因此在使用的时候一定需要一个具体的实现类,NonfairSyncFairSync就是具体的实现。

现在我们大概已经理清了ReentrantLock中各部分之间的关系了,接下来,我们就从非公平锁入手分析一下lock.lock()这条语句的执行流程。下面使我们在程序中调用的lock()方法的源码:

2、深入源码分析获取非公平锁原理

public void lock()方法中调用了Sync类的lock()方法,这个方法在NonFairSync中的实现如下:

方法的大致逻辑如下:

  • 1、首先调用AQS中的compareAndSetState(int,int)方法先尝试将控制同步状态的变量由0变为1 ,如果操作成功了就表示同步锁获取成功了(这一步就是非公平性产生的原因),之后就将当前线程设置为独占锁的拥有者;
  • 如果首次获取尝试失败就会调用acquire()方法排队等待获取锁了,这个方法在AQS中,具体的实现原理请参考深入源码分析AQS实现原理。需要注意的是在acquire(int arg)方法中调用了tryAcquire(int arg)方法,此方法在AQS中没有实现,在NonFiarSync中有实现,但是它最终调用了Sync类中的 nonfairTryAcquire(int arg) 方法,下面是该方法的源码:

方法的逻辑大致如下:

  • 1、获取到当前线程以及当前同步队列的锁状态;
  • 2、有两个if分支,如果锁状态变量state==0,表示当前同步锁没有被其他线程占用,那就用CAS原子的将state从0改为1,表示获取同步锁成功了,获取成功之后会把当前线程会把自己设置为同步锁的拥有者。
  • 3、如果当前线程判断同步锁的状态不为0,那就说明同步锁已经在使用中了,于是当前线程再次判断在使用锁的线程是不是自己,如果是自己,那就再次进去,由于此时这个线程就处于重入的状态了,因此绝对没有其他线程和它争抢state了,因此使用普通的 setState(int arg) 方法把state++(此时state就表示当前线程重入的次数)并更新即可。

NonFairSync获取锁的流程图如下所示

3、深入源码分析获取公平锁原理

在前面我说到过,在ReentrantLock的有参构造方法中传入true参数就可以得到一个公平锁,就像下面这样:

 ReentrantLock fireLock = new ReentrantLock(true);
 fireLock.lock();                                 
 try{                                             
     //TODO                                         
 }finally {                                       
     fireLock.unlock();                           
 }                                                

public void lock()方法中调用了Sync类的lock()方法,lock()方法直接将每一个线程排队等待获取锁,这也就是公平的原因,每个线程在同步队列中排队人人都有获取到锁的机会。这个方法在FairSync中的实现如下:

还是同样的配方,还是同样的套路,我们直接看在FairSync中的tryAcquire()方法的实现即可,上图第二个方法就是在FairSync中对tryAcquire()方法的实现,方法的逻辑大致如下:

  • 1、得到当前线程,并且获得同步启动器的同步锁状态
  • 2、如果同步器的锁状态为0,说明还没有线程持有该锁,那么就判断一下当前队列是否还没有初始化,如果没有初始化,那就先初始化,初始化完成后在使用CAS把同步器的state从0设置为1,设置成功了就表示获取锁成功了,之后还是把自己设置为当前同步器锁的持有者。
  • 3、如果当前线程判断同步锁的状态不为0,那就说明同步锁已经在使用中了,于是当前线程再次判断在使用锁的线程是不是自己,如果是自己,那就再次进去,由于此时这个线程就处于重入的状态了,因此绝对没有其他线程和它争抢state了,因此使用普通的 setState(int arg) 方法把state++(此时state就表示当前线程重入的次数)并更新即可。

其实细心观察我们就会发现,这个方法的实现和前面我们将的 nonfairTryAcquire(int arg) 方法的实现基本一致。

4、深入源码分析锁的释放

由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:

可以看到,本质释放锁的地方,还是通过AQS实现的。下面是AQS中的release()方法源码:

关于这个方法的解析在我的博客深入源码分析AQS实现原理有说明。这里不再细说,他会首先调用tryRelease()方法尝试释放锁,释放锁成功之后再通过unparkSuccessor()方法(本质是通过LockSupport.unpark()方法)唤醒后继结点。其中tryRelease()在Sync中的实现如下:

方法的逻辑不难理解:

  • 1、没执行一次tryRelease()方法状态变量就会执行一次 state– 操作
  • 2、接着判断一下当前线程是否是持有锁的线程,如果不是就会抛出异常,否则进入下一步
  • 3、当锁状态state减为0了就说明把锁释放成功了,此时需要把锁的持有线程在AQS中引用设置为null,好让其他线程可以获得锁,最后锁释放成功会返回true。

四、实现自定义同步锁

在学习了AQS和Reentrant之后,为了加深对锁的理解以及应用,我们自己来实现一个简单的互斥锁MutexLock

思路:实现我们可以实现Lock接口,然后在我们的锁类中聚合AQS,使用他来管理多个多个线程的同步访问的排序。并且在测试中我们来做一道经典的多线程题目:启动10个线程,第一个线程从1加到10,第二个线程从11加到20…第十个线程从91加到100,最后再把十个线程结果相加。

  • 自定义互斥锁
package top.easyblog.mutex;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 基于AQS实现自定义独占锁
 *
 * @author :huangxin
 * @modified :
 * @since :2020/04/12 22:13
 */
public class MutexLock implements Lock {

    private volatile Sync sync;

    public MutexLock() {
        this.sync = new Sync();
    }

    static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 尝试获取锁
         *
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            boolean success = false;
            if (compareAndSetState(0, 1)) {
                //加锁成功,设置当前线程为owner线程
                setExclusiveOwnerThread(Thread.currentThread());
                success = true;
            }
            return success;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        public Condition newCondition() {
            return new ConditionObject();
        }

    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
  • 测试类

启动10个线程,第一个线程从1加到10,第二个线程从11加到20…第十个线程从91加到100,最后再把十个线程结果相加

public class MutexLockTest extends Thread {

    private int start;
    static int sum;
    //自定义的互斥锁
    private MutexLock lock;

    public MutexLockTest(int start) {
        this.start = start;
        this.lock = new MutexLock();
    }

    @Override
    public void run() {
        int tmp=0;
        for (int i = 0; i < 10; i++) {
            tmp += start + i;
        }
        lock.lock();
        try {
            sum += tmp;
        }finally {
            System.out.println(Thread.currentThread().getName() + "==>" + sum);
            lock.unlock();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threadList = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threadList[i] = new MutexLockTest(10 * i + 1);
            threadList[i].start();
        }
        for (int i = 0; i < 10; i++) {
            threadList[i].join();
        }
        System.out.println("Sum is : " + sum);

    }

}
  • 执行结果

留言区

还能输入500个字符