面试常问的几个Java并发工具类你都GET到了吗?

在JDK的并发中提供了几个非常有用的并发工具类:CountDwonLatch、CyclicBarrier、Semaphore。下面我们就一起了解一下这些类的基本使用以及基本原理吧。

PS:最起码你得知道它们是干什么用得,下面列出得代码也建议你跑跑。其实如果你理解AQS的设计原理的话,这些工具类的原理就不难理解了。

一、闭锁 CountDwonLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作之后在进行下面的业务的场景。作用类似于Thread类中的join()方法,但是比join()方法更加强大。下面是一个简单的使用示例:

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        //CountDownLatch的构造参数接收一个int类的参数作为计数器,如果你想等待N个线程,就传入N
        CountDownLatch latch = new CountDownLatch(2);
        new Thread(() -> {
            System.out.println("t1 do with service ......");
            //模拟耗时1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t1 do with service done!");
            //让锁状态计数器state减1
            latch.countDown();
        }, "t1").start();
        new Thread(() -> {
            System.out.println("t2 do with service ......");
            //模拟耗时1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t2 do with service done!");
            //让锁状态计数器state减1
            latch.countDown();
        }, "t2").start();

        System.out.println("wait start....");
        latch.await();    //主线程会在这一步阻塞,直到计数器的值减为0了才会退出阻塞状态
        /**
         * 另一个重载方法,可以指定等待的超时时间,超时后计数器的值如果还没减为0也会退出阻塞,用法如下:
         * if(!latch.await(5, TimeUnit.SECONDS)){
         *     System.out.println("wait time out... withdraw blocking state");
         *  }
         */
        System.out.println("wait end.");
    }

}

执行结果

为什么join()方法和他的作用类似还需要CountDownLatch?

答:因为join()的阻塞原理是不断检查join()所属的线程是否存活,如果线程还存活这就继续阻塞,直到线程死亡了才会退出join()阻塞。但是在实际应用场景下我们一般都是使用线程池来维护线程资源的,怎么可能随意的让一个线程死亡,此时join()就没法使用了,CountdDownLatch就可以解决这个问题,CountDownLatch的阻塞原理是仅仅关注计数器是否为0,若为0才保持阻塞,它并不关注持有计数器的其它线程是否完全执行完毕。 下面是一个CountDownLatch配合线程池使用的示例:

public class CountDownLatchTest2 {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        CountDownLatch latch = new CountDownLatch(15);
        try {
            threadPool.execute(() -> {
                try {
                    System.out.println("等待其他线程执行完先前操作.......");
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("最后一步操作开始执行....");
                //TODO
            });

            for (int i = 0; i < 15; i++) {
                int tmp = i;
                threadPool.execute(() -> {
                    try {
                        //模拟业务处理耗时tmp s
                        Thread.sleep(tmp*1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                    System.out.println("第" + tmp + "步操作完成");
                });
            }

        } finally {
            //程序执行结束,关闭线程池
            threadPool.shutdown();
        }
    }

}

执行结果

CountDownLatch原理简单分析

CountDownLatch是基于AQS实现的一个同步工具,大部分和同步状态有关的操作在AQS中已经被实现了,CountDownLatch采用在内部聚合AQS的方式,实现了对多个线程的同步控制。在CountDownLatch中有一个内部类Sync继承了AQS,下面是Sync的全部实现:

我们可以看到Sync的构造方法可以接受一个int类型的参数,这个参数就是计数器的值,这个值最终通过setState()方法设置给了AQS中的一个控制同步队列锁状态的字段state。再来看看CountDonwLatch的构造方法就立刻明白了:

在构造方法中直接new了一个Sync,并把我们传入的参数又传给了Sync。

再来看看countDown()方法干了啥:

可以看到,countDown()方法直接调用了同步器的releaseShared()方法,这个方法我的另一篇博客中深入源码分析AQS实现原理中解析过,在这个方法中会首先调用上面Sync子类中的tryReleaseShared()方法,在这个方法中会循环的把AQS中的state字段的值减1,直到减为0了就返回true,表示同步状态释放了。

二、同步屏障 CyclicBarrier

CyclicBarrier([ˈsaɪklɪk ˈbæriə(r)],同步屏障),作用和CountDownLatch类似,CyclicBarrier的构造方法同样也需要一个int类型的参数,当线程调用了CyclicBarrier.await()就进入阻塞。当阻塞的线程数达到了构造参数传入的数目时,所有进入等待状态的线程都被唤醒并继续执行。

CyclicBarrier就象它名字的意思一样,可看成是个屏障, 所有的线程必须到齐后才能一起通过这个屏障。

CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

public class CyclicBarrierTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            threadPool.execute(new Writer(cyclicBarrier));
        }
        threadPool.shutdown();
    }
}

class Writer extends Thread {

    private CyclicBarrier cbr;

    public Writer(CyclicBarrier cbr) {
        this.cbr = cbr;
    }

    @Override
    public void run() {
        try {
            System.out.println("线程" + Thread.currentThread().getName() + ",正在写入数据");
            Thread.sleep((int)Math.random()*1000);
            System.out.println("线程" + Thread.currentThread().getName() + ",写入数据成功.....");
            cbr.await();
            System.out.println("所有线程执行完毕..........");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

执行结果

CyclicBarrier和CountDownLatch的区别?

答:CyclicBarrier的计数器可以使用reset()方法重置为初始值,而CountownLatch的计数器是一次性的。不可以恢复。基于这一点,CyclicBarrier能处理更为复杂的业务场景,比如计算发生错误,可以重置计数器,并让线程重新执行一次。

三、信号量 Semaphore

Semaphore( [ˈseməfɔːr],信号量),它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。

Semaphore可以用来进行流量控制,特别是资源有限的应用场景,比如构建一些对象池,资源池之类的我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。下面用Semphore实现一个简单的可以控制连接数量的数据库连接池:

public class SimpleDBPool {

    //连接池大小
    private int poolSize;

    //数据库连接数组
    private Connection[] connections;

    //流量控制
    private Semaphore semaphore;

    //连接是否在使用中 0表示未使用,1表示在使用
    private AtomicIntegerArray busy;

    public SimpleDBPool(int poolSize) {
        this.poolSize = poolSize;
        this.semaphore = new Semaphore(poolSize);
        connections = new Connection[poolSize];
        busy = new AtomicIntegerArray(poolSize);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new DBUtil("jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?autoReconnect=true&useSSL=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai").getConnection();
        }
    }

    //使用数据库连接
    public Connection borrow() {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            if (busy.get(i) == 0 && busy.compareAndSet(i, 0, 1)) {
                System.out.println(Thread.currentThread().getName()+"获得数据库连接==>"+connections[i]+" ,开始执行查询任务....");
                return connections[i];
            }
        }
        //程序永远不会执行带这一行
        return null;
    }

    //归还数据库连接
    public boolean free(Connection connection) {
        for (int i = 0; i < poolSize; i++) {
            if (connection == connections[i]) {
                busy.set(i, 0);
                semaphore.release();
                System.out.println(Thread.currentThread().getName()+"查询结束,归还数据库连接"+connections[i]);
                return true;
            }
        }
        return false;
    }


    static class DBUtil {
        private String url = null;    //数据库的链接地址

        public DBUtil(String url) {
            this.url = url;
        }

        //静态代码块,初始化类的时候就加载数据库驱动
        static {
            try {
                Class.forName("com.mysql.cj.jdbc.Driver");    //加载数据库驱动
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


        private Connection getConnection() {
            Connection con = null;
            try {
                //暂时先写死
                con = DriverManager.getConnection(url, "root", "password");
            } catch (SQLException e) {
                System.out.println("数据库操作异常,请检查用户名、密码");
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return con;
        }
    }
}

class Test {
    public static void main(String[] args) {
        //维护3个连接的数据库连接池
        SimpleDBPool dbPool = new SimpleDBPool(3);
        //线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for(int i=0;i<5;i++){
            threadPool.execute(()->{
                Connection connection = dbPool.borrow();
                try {
                    //模拟数据库查询耗时 10s
                    Thread.sleep(10_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                dbPool.free(connection);
            });
        }

        threadPool.shutdown();
    }
}

执行结果

在代码中,虽然有5个线程需要获取数据库连接,但是通过Semaphore限流,每次最大允许3个线程获得数据库连接,从而保证了数据库的安全性。

Semaphore的方法

Semaphore除了上面使用的两个方法: acquire()和release() 之外,还提供了许多有用的方法:

public int availablePerimits()      //返回此信号量中当前可用的许可证数
public int getQueueLength()         //返回正在等待获取许可的线程数
public boolean hasQueuedThreads()    //是否有线程正在等待获取许可
protected void reducePermits(int reduction)   //减少reduction个许可证
protected Collection getQueuedThreads()       //返回所有正在等待获取许可的线程的集合

留言区

还能输入500个字符