分布式锁的三种实现方案

一、为什么要使用分布式锁

我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的Java多线程的18般武艺进行处理,并且可以完美的运行!

注意这是单机应用,也就是所有的请求都会分配到当前服务器的JVM内部,然后映射为操作系统的线程进行处理!而这个共享变量只是在这个JVM内部的一块内存空间!

后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:

上图我们可以看到,在集群环境下不同服务器中的变量是单独的JVM中用于一块独立的空间,如果仅仅使用Java内置的锁他只是JVM级别的锁,也即使说只有分布式服务里面,多个服务属于不同进程,用普通的同步锁会失效,因为不同进程里面,就没有什么内存共享之类的说法。

因此为了保证一个方法或属性在高并发情况下的同一时间只能被一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

二、分布式锁应该具备哪些条件

分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些特性:

  • 1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
  • 2、高可用的获取锁与释放锁;
  • 3、高性能的获取锁与释放锁;
  • 4、具备可重入特性;
  • 5、具备锁失效机制,防止死锁;
  • 6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

三、分布式锁的三种实现方式以及Redis分布式锁的具体实现

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足 一致性(Consistency)、 **可用性(Availability)**和 分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行,当前业内公认的实现分布式锁有三套方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis等)实现分布式锁
  3. 基于Zookeeper实现分布式锁
1、基于数据库实现分布式锁
方案一:基于数据库表的实现

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。

当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。

创建这样一张数据库表:

CREATE TABLE `metux_lock` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
    `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
    `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
    UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
    PRIMARY KEY (`id`),
    UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

当我们想要获取锁时,执行以下SQL:

insert into metux_lock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对method_name做了唯一性约束,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行临界区内容。

当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

delete from metux_lock where method_name ='method_name'

上面这种简单的实现有以下几个问题:

  • 1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

  • 2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

  • 3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

  • 4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

当然,我们也可以有其他方式解决上面的问题。

针对 数据库是单点问题可以搞数据库集群,数据之前双向同步。一旦挂掉快速切换到备库上。

针对 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

针对 非阻塞的?搞一个while循环,直到insert成功再返回成功。

针对 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

方案二:基于数据库排他锁的实现

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。

我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作(伪代码):

public boolean lock(){
    connection.setAutoCommit(false)
    while(true){
        try{
            result = "select * from metux_lock where method_name=xxx for update";
            if(result==null){
                return true;
            }
        }catch(Exception e){
           //TODO
        }
        sleep(1000);
    }
    return false;
}

在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。

我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:

public void unlock(){
    connection.commit();
}

这种数据库分布式锁方案有效解决上面提到的无法释放锁和阻塞锁的问题。

  • 阻塞锁:for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。
  • 服务宕机自动释放锁:使用这种方式,服务宕机之后数据库会自己把锁释放掉。但是还是无法直接解决数据库单点可用性和可重入问题。
方案总结

总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。这两种方式各有各自的优缺点,但是我想说使用数据库实现分布式锁的方案虽然可行但是你要想想,我们的业务系统都搞成分布式的了,那就说明我们的业务系统已经有很大的流量了,这时候数据库的压力本生就很大了,此时在人为的给数据库增加压力,我个人认为这种方式可行但不可取!

2、基于缓存(Redis等)实现分布式锁

相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点。而且很多缓存是可以集群部署的,可以解决单点问题。目前有很多成熟的缓存产品,包括Redis,memcached等。这里我主要讲解一下使用Redis做分布式锁的方式——通过Redis的SETNX key value命令配合lua脚本保证命令执行的原子性。下面我们来一步步分析这其中的”坑“,并解决它们。

首先按照基本想法我实现一个简单版本的如下:

@Service
public class ShopCarServiceImpl {

    @Autowired
    private RedisUtils redisUtils;

    private static final String MUTEX_LOCK_KEY = "product";

    /**
     * @param num 商品的数量
     * @return 成功返回 true ,失败返回 false
     */
    public boolean submitShopCar(int num) {
        //获取锁(非阻塞)
        Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, "lock");
        if (!lock) {
            System.out.println("获取Redis锁失败,直接返回");
            return false;
        }
        System.out.println("获取Redis锁成功。");
        Integer stock= (Integer) redisUtils.get("stock");
        if (stock > 0 && num <= stock) {
            //下单
            stock = stock - num;
            redisUtils.set("stock", stock);
            System.out.print("库存充足。下单成功。剩余库存:" + stock+"\n");
        } else {
            System.out.print("库存不足,无法下单。库存:" + stock+"\n");
        }
        //释放锁
        redisUtils.delete(MUTEX_LOCK_KEY);
        return true;
    }

}

这个版本中很明显的有很多错误,首先一个可能的错误就是获取锁的线程在执行程序是突然发生异常了咋办?

那么解决问题我们可以使用try-catch-finally来解决,更该版本如下:

public boolean submitShopCar(int num) {
        //获取锁(非阻塞)
        Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, "lock");
        if (!lock) {
            System.out.println("获取Redis锁失败,直接返回");
            return false;
        }
        try {
            System.out.println("获取Redis锁成功。");
            Integer stock= (Integer) redisUtils.get("stock");
            if (stock > 0 && num <= stock) {
                //下单
                stock = stock - num;
                redisUtils.set("stock", stock);
                System.out.print("库存充足。下单成功。剩余库存:" + stock + "\n");
            } else {
                System.out.print("库存不足,无法下单。库存:" + stock + "\n");
            }
        } finally {
            //释放锁
            redisUtils.delete(MUTEX_LOCK_KEY);
        }
        return true;
    }

这般代码还不够健壮,因为如果在线程释放锁的时候突然停电了或者其他意外导致这个锁标志没法移除,那么这就是比较糟糕的,对于其他线程来说这个锁就不可用了。

解决这个问题我们可以在设置锁的时候给锁设置一个过期时间,比如30s过期。这样一来就保证了锁的可用性,即使一个节点突然发生问题导致锁没有主动释放,那么也可通过Redis的超时机制来保护锁的可用性。

@Service
public class ShopCarServiceImpl {


    @Autowired
    private RedisUtils redisUtils;

    private static final String MUTEX_LOCK_KEY = "product";

    private static final int LOCK_TIMEOUT=30;

    /**
     * @param num 商品的数量
     * @return 成功返回 true ,失败返回 false
     */
    public boolean submitShopCar(int num) {
        //获取锁(非阻塞),直接使用带超时时间的setNX,保证它的原子性
        Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, "lock",LOCK_TIMEOUT);
        if (!lock) {
            System.out.println("获取Redis锁失败,直接返回");
            return false;
        }

        try {
            System.out.println("获取Redis锁成功。");
            Integer stock= (Integer) redisUtils.get("stock");
            if (stock > 0 && num <= stock) {
                //下单
                stock = stock - num;
                redisUtils.set("stock", stock);
                System.out.print("库存充足。下单成功。剩余库存:" + stock + "\n");
            } else {
                System.out.print("库存不足,无法下单。库存:" + stock + "\n");
            }
        } finally {
            //释放锁
            redisUtils.delete(MUTEX_LOCK_KEY);
        }
        return true;
    }
}

由于我们目前这个分布式锁还只是一个非阻塞锁,也就是当线程没有获取到锁就立即返回失败,这在有些场景下是非常不友好的一种设计,因此我们需要把它改造成阻塞式锁。

@Service
public class ShopCarServiceImpl {

    @Autowired
    private RedisUtils redisUtils;

    private static final String MUTEX_LOCK_KEY = "product";

    private static final int LOCK_TIMEOUT=30;

    /**
     * @param num 商品的数量
     * @return 成功返回 true ,失败返回 false
     */
    public boolean submitShopCar(int num) {
        //获取锁
        Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, "lock",LOCK_TIMEOUT);
        //自旋,不返回,直到获取锁成功,这里不一定需要这么写,只是提供一个解决问题的思路
        while(!lock){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock = redisUtils.setNX(MUTEX_LOCK_KEY, "lock",LOCK_TIMEOUT);
        }

        try {
            System.out.println("获取Redis锁成功。");
            Integer stock= (Integer) redisUtils.get("stock");
            if (stock > 0 && num <= stock) {
                //下单
                stock = stock - num;
                redisUtils.set("stock", stock);
                System.out.print("库存充足。下单成功。剩余库存:" + stock + "\n");
            } else {
                System.out.print("库存不足,无法下单。库存:" + stock + "\n");
            }
        } finally {
            //释放锁
            redisUtils.delete(MUTEX_LOCK_KEY);
        }
        return true;
    }
}

现在这把Redis实现的分布式锁足够安全了吗?我认为还不够!因为在释放锁的时候没有控制,任何线程都可以把这个key为“product”的锁标志删除了,这样看来,我们的这个锁还是不够安全。

解决这个问题的方法是在设置key的时候向vlaue写一个唯一的随机值(比如UUID),在删除的时候需要比对key的值是否是在上锁的时候的值一样,如果一样才可以删除这个值,否者不允许删除。

/**                                                                         
 * @param num 商品的数量                                                         
 * @return 成功返回 true ,失败返回 false                                            
 */                                                                         
public boolean submitShopCar(int num) { 
    //加锁之前生成一个唯一的随机值
    String uuid = UUID.randomUUID().toString();                             
    //获取锁                                                            
    Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, uuid, LOCK_TIMEOUT);    
    //自旋,不直接返回,直到获取锁成功                                                     
    while (!lock) {                                                         
        try {                                                               
            Thread.sleep(1000);                                             
        } catch (InterruptedException e) {                                  
            e.printStackTrace();                                            
        }                                                                   
        uuid = UUID.randomUUID().toString();                                
        lock = redisUtils.setNX(MUTEX_LOCK_KEY, uuid, LOCK_TIMEOUT);        
    }                                                                       
                                                                            
    try {                                                                   
        System.out.println("获取Redis锁成功。");                                  
        Integer stock = (Integer) redisUtils.get("stock");                  
        if (stock > 0 && num <= stock) {                                    
            //下单                                                            
            stock = stock - num;                                            
            redisUtils.set("stock", stock);                                 
            System.out.print("库存充足。下单成功。剩余库存:" + stock + "\n");             
        } else {                                                            
            System.out.print("库存不足,无法下单。库存:" + stock + "\n");               
        }                                                                   
    } finally {   
        //释放锁之前比对唯一值相等才可以释放锁
        if(uuid.equalsIgnoreCase((String) redisUtils.get(MUTEX_LOCK_KEY))) {
            //释放锁                                                           
            redisUtils.delete(MUTEX_LOCK_KEY);                              
        }                                                                   
    }                                                                       
                                                                            
    return false;  
}   

现在我们实现这把锁安全了吗?表面上看似安全了,但是仔细一想就会发现:我们的锁是有一个过期时间的,在比较繁杂的业务执行过程中执行业务的时间可能就会超过锁的过期时间,锁突然过期了这对于业务系统安全也是一种实实在在的威胁。

解决这个问题的方法就是在主线程执行业务的时候再开一个线程当检测到锁快过期了就异步的为锁”续命“,直到主线程把任务执行完之后,异步线程死亡。

@Service
public class ShopCarServiceImpl {


    @Autowired
    private RedisUtils redisUtils;

    private static final String MUTEX_LOCK_KEY = "product";

    private static final int LOCK_TIMEOUT = 30;

    /**
     * @param num 商品的数量
     * @return 成功返回 true ,失败返回 false
     */
    public boolean submitShopCar(int num) {
        String uuid = UUID.randomUUID().toString();
        //获取锁(非阻塞)
        Boolean lock = redisUtils.setNX(MUTEX_LOCK_KEY, uuid, LOCK_TIMEOUT, TimeUnit.SECONDS);
        //自旋,不返回,直到获取锁成功
        while (!lock) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            uuid = UUID.randomUUID().toString();
            lock = redisUtils.setNX(MUTEX_LOCK_KEY, uuid, LOCK_TIMEOUT, TimeUnit.SECONDS);
        }
        System.out.println("获取Redis锁成功。");
        //开启一个定时任务,每当锁还剩下1/3的时间时就续命
        ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
        scheduledExecutor.schedule(() -> {
            //定时任务给锁续命
            redisUtils.expire(MUTEX_LOCK_KEY, LOCK_TIMEOUT, RedisDBSelector.DB_0);
            System.out.println("给锁续命成功");
        }, LOCK_TIMEOUT * 2 / 3, TimeUnit.SECONDS);

        try {
            Integer stock = (Integer) redisUtils.get("stock");
            if (stock > 0 && num <= stock) {
                //下单
                stock = stock - num;
                Thread.sleep(30001);
                redisUtils.set("stock", stock);
                System.out.print("库存充足。下单成功。剩余库存:" + stock + "\n");
            } else {
                System.out.print("库存不足,无法下单。库存:" + stock + "\n");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (uuid.equalsIgnoreCase((String) redisUtils.get(MUTEX_LOCK_KEY))) {
                //关闭定时任务
                scheduledExecutor.shutdown();
                //释放锁
                redisUtils.delete(MUTEX_LOCK_KEY);
            }
        }

        return false;
    }
}

这里我直接使用一个定时任务线程池,每当锁时间大约剩下1/3的时候就给锁续命,而后当执行完任务之后主线程关闭定死任务并释放锁,下一个线程就可以进来处理业务了。下面是执行截图:

方案总结

到这里,我们已经基本实现了redis分布式锁,Redis 实现分布式锁主要步骤如下:

  1. 指定一个key作为锁标记,存入 Redis 中,并且指定一个 唯一的用户标识 作为 value。
  2. 当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。
  3. 给key设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。
  4. 当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁
3、利用zookeeper实现分布式锁

让我们来回顾一下Zookeeper节点的概念:

Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode。Znode有可以分为四种类型:

  • 持久节点 (PERSISTENT):默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。
  • 持久节点顺序节点(PERSISTENT_SEQUENTIAL):所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号。
  • 临时节点(EPHEMERAL):和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

Zookeeper分布式锁的原理恰恰应用了临时顺序节点的特性,并且根据CAP理论,它是基于CP的一种分布式锁实现。具体实现原理和代码可以参考我的另一篇博文Zookeeper分布式锁的原理和具体实现

参考链接

留言区

还能输入500个字符