一、从Lock接口说起
锁时用来控制多个线程范文共享资源,保证资源安全性的一种重要手段。在Lock接口出现以前,Java陈旭是靠synchorinzed关键字实现锁功能点的,在jdk1.5之前使用synchroinzed绝对是一个重量级的操作,因为synchroinzed是依赖对象锁实现的,而对象锁(ObjectMonitor)又依赖底层操作系统的mutex lock
指令,这个指令需要操作系统在用户态和内核态不断的切换,这是一个很消耗CPU资源的操作。好的一点是Java在jdk 1.5提供了J.U.C并发包,其中新增的Lock接口具有和synchorinzed同样的锁功能,只是在使用的时候需要我们显示的获取和释放锁。 就像下面这样:
Lock lock=new ReentrantLock();
lock.lock();
try{
//TODO
}finally{
lock.unlock();
}
Lock接口的API
Lock是一个接口,它定义了所的获取和释放的基本操作,Lock接口定义方法有如下几个:
void lock() //获取锁,调用该方法的线程会获取锁
void lockInterruptibly() //可中断地获取锁,该方法可以在获取锁的过程中响应中断
boolean tryLock() //尝试分阻塞的获取锁,调用该方法后立即返回,如果获取到锁了返回true,否者返回false
boolean tryLock(long time,TimeUtil util) throws InterrutptionException //超时获取锁
void unlock() //释放锁
Condition newCondition() //获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁才能调用该组件的wait()方法,调用方法后当前线程释放锁
Lock接口中的这些接口方法在ReentrantLock这个类中有具体的实现,接下来我们来首先了解一下ReentrantLock的基本特性,之后我们在深入源码分析一下这把锁的原理。
二、ReentrantLock基本特点
- (1)可重入:和synchronized一样是一个可重入锁
- (2)可以响应中断:ReenTrantLock提供了一种能够中断等待锁的线程的机制。
- (3)可以通过构造器构造 公平锁 还是 非公平锁 :ReenTrantLock可以指定是公平锁还是非公平锁; synchronized只能是非公平锁。
- (4)可以在获取锁的时候设置超时时间
- (5)需要显示的获取和释放锁
三、深入源码分析RentrantLock
在阅读ReentrantLock的源码之前,我建议你最好是知道CAS、volatile、以及AQS这些基本的概念的,不然你会越读越糊涂。如果你确实对这些知识优点含糊不清或者压根不知道,那么你可以百度先了解一下,或者你也可以在我的博客Java多线程与高并发分类中找到相应的文章学习。
好了,我么回到主题。首先我们来看一下ReentranLock的类图:
从类图中可以直观的看到,Reentrantlock继承了上面我们说的Lock接口,并且在其内部有三个内部类:Sync
、NonfairSync
、FairSync
。其中Sync继承了AQS,NonfairSync
、FairSync
继承了Sync,他们分别是非公平锁好公平锁的实现。接下来我们对这三个部分详细分析一下。
1、ReentrantLock的构造方法
ReentrantLock有两个构造方法,默认无参的构造方法是一个非公平锁,但是我们可以使用另一个有参构造方法构建一个公平锁。下面是两个构造方法的源码:
可以看到,默认的无参构造方法他是直接new了一个非公平同步器(即非公平锁),而在有参构造方法中,我们传入true表示构造一个公平锁,反之就是非公平锁。
你问我 sync 变量是啥?这是整个ReentrantLock唯一的一个成员变量,ReentrantLock就是通过聚合了一个同步器的子类完成了对多个线程的访问控制。
Sync是一个抽象类,因此在使用的时候一定需要一个具体的实现类,NonfairSync
、FairSync
就是具体的实现。
现在我们大概已经理清了ReentrantLock中各部分之间的关系了,接下来,我们就从非公平锁入手分析一下lock.lock()
这条语句的执行流程。下面使我们在程序中调用的lock()方法的源码:
2、深入源码分析获取非公平锁原理
在public void lock()
方法中调用了Sync类的lock()方法,这个方法在NonFairSync中的实现如下:
方法的大致逻辑如下:
- 1、首先调用AQS中的compareAndSetState(int,int)方法先尝试将控制同步状态的变量由0变为1 ,如果操作成功了就表示同步锁获取成功了(这一步就是非公平性产生的原因),之后就将当前线程设置为独占锁的拥有者;
- 如果首次获取尝试失败就会调用acquire()方法排队等待获取锁了,这个方法在AQS中,具体的实现原理请参考深入源码分析AQS实现原理。需要注意的是在acquire(int arg)方法中调用了tryAcquire(int arg)方法,此方法在AQS中没有实现,在NonFiarSync中有实现,但是它最终调用了Sync类中的 nonfairTryAcquire(int arg) 方法,下面是该方法的源码:
方法的逻辑大致如下:
- 1、获取到当前线程以及当前同步队列的锁状态;
- 2、有两个if分支,如果锁状态变量state==0,表示当前同步锁没有被其他线程占用,那就用CAS原子的将state从0改为1,表示获取同步锁成功了,获取成功之后会把当前线程会把自己设置为同步锁的拥有者。
- 3、如果当前线程判断同步锁的状态不为0,那就说明同步锁已经在使用中了,于是当前线程再次判断在使用锁的线程是不是自己,如果是自己,那就再次进去,由于此时这个线程就处于重入的状态了,因此绝对没有其他线程和它争抢state了,因此使用普通的 setState(int arg) 方法把
state++
(此时state就表示当前线程重入的次数)并更新即可。
NonFairSync获取锁的流程图如下所示:
3、深入源码分析获取公平锁原理
在前面我说到过,在ReentrantLock的有参构造方法中传入true参数就可以得到一个公平锁,就像下面这样:
ReentrantLock fireLock = new ReentrantLock(true);
fireLock.lock();
try{
//TODO
}finally {
fireLock.unlock();
}
在public void lock()
方法中调用了Sync类的lock()方法,lock()方法直接将每一个线程排队等待获取锁,这也就是公平的原因,每个线程在同步队列中排队人人都有获取到锁的机会。这个方法在FairSync中的实现如下:
还是同样的配方,还是同样的套路,我们直接看在FairSync中的tryAcquire()方法的实现即可,上图第二个方法就是在FairSync中对tryAcquire()方法的实现,方法的逻辑大致如下:
- 1、得到当前线程,并且获得同步启动器的同步锁状态
- 2、如果同步器的锁状态为0,说明还没有线程持有该锁,那么就判断一下当前队列是否还没有初始化,如果没有初始化,那就先初始化,初始化完成后在使用CAS把同步器的state从0设置为1,设置成功了就表示获取锁成功了,之后还是把自己设置为当前同步器锁的持有者。
- 3、如果当前线程判断同步锁的状态不为0,那就说明同步锁已经在使用中了,于是当前线程再次判断在使用锁的线程是不是自己,如果是自己,那就再次进去,由于此时这个线程就处于重入的状态了,因此绝对没有其他线程和它争抢state了,因此使用普通的 setState(int arg) 方法把
state++
(此时state就表示当前线程重入的次数)并更新即可。
其实细心观察我们就会发现,这个方法的实现和前面我们将的 nonfairTryAcquire(int arg) 方法的实现基本一致。
4、深入源码分析锁的释放
由于ReentrantLock在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:
可以看到,本质释放锁的地方,还是通过AQS实现的。下面是AQS中的release()方法源码:
关于这个方法的解析在我的博客深入源码分析AQS实现原理有说明。这里不再细说,他会首先调用tryRelease()方法尝试释放锁,释放锁成功之后再通过unparkSuccessor()方法(本质是通过LockSupport.unpark()方法)唤醒后继结点。其中tryRelease()在Sync中的实现如下:
方法的逻辑不难理解:
- 1、没执行一次tryRelease()方法状态变量就会执行一次 state-- 操作
- 2、接着判断一下当前线程是否是持有锁的线程,如果不是就会抛出异常,否则进入下一步
- 3、当锁状态state减为0了就说明把锁释放成功了,此时需要把锁的持有线程在AQS中引用设置为null,好让其他线程可以获得锁,最后锁释放成功会返回true。
四、实现自定义同步锁
在学习了AQS和Reentrant之后,为了加深对锁的理解以及应用,我们自己来实现一个简单的互斥锁MutexLock
。
思路:实现我们可以实现Lock接口,然后在我们的锁类中聚合AQS,使用他来管理多个多个线程的同步访问的排序。并且在测试中我们来做一道经典的多线程题目:启动10个线程,第一个线程从1加到10,第二个线程从11加到20...第十个线程从91加到100,最后再把十个线程结果相加。
- 自定义互斥锁
package top.easyblog.mutex;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 基于AQS实现自定义独占锁
*
* @author :huangxin
* @modified :
* @since :2020/04/12 22:13
*/
public class MutexLock implements Lock {
private volatile Sync sync;
public MutexLock() {
this.sync = new Sync();
}
static class Sync extends AbstractQueuedSynchronizer {
/**
* 尝试获取锁
*
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
boolean success = false;
if (compareAndSetState(0, 1)) {
//加锁成功,设置当前线程为owner线程
setExclusiveOwnerThread(Thread.currentThread());
success = true;
}
return success;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
public Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
- 测试类
启动10个线程,第一个线程从1加到10,第二个线程从11加到20...第十个线程从91加到100,最后再把十个线程结果相加
public class MutexLockTest extends Thread {
private int start;
static int sum;
//自定义的互斥锁
private MutexLock lock;
public MutexLockTest(int start) {
this.start = start;
this.lock = new MutexLock();
}
@Override
public void run() {
int tmp=0;
for (int i = 0; i < 10; i++) {
tmp += start + i;
}
lock.lock();
try {
sum += tmp;
}finally {
System.out.println(Thread.currentThread().getName() + "==>" + sum);
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threadList = new Thread[10];
for (int i = 0; i < 10; i++) {
threadList[i] = new MutexLockTest(10 * i + 1);
threadList[i].start();
}
for (int i = 0; i < 10; i++) {
threadList[i].join();
}
System.out.println("Sum is : " + sum);
}
}
- 执行结果