高并发编程之等待/通知机制

一、什么是等待/通知机制?

等待/通知机制就是一个【线程A】等待,一个【线程B】通知(线程A可以不用再等待了)。比如生产者和消费者模型,消费者等待生产者生产资源,这是等待,生产者生产好资源通知等待的消费者去消费,这是通知。

等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在java.lang.Object类中

wait()     //代用该方法的线程会进入到WAITING状态,并且当前线程会被放置到等待队列中,只有等待另外线程的唤醒或被中断返回,调用wait()方法后线程会释放对象锁,同时该方法必须在同步方法或同步块中使用,否则会抛出IllegalMonitorStateException异常。
wait(long)     //超时等待一段时间,时间单位是ms,如果没有被唤醒就超时返回
wait(long,int)   //等待一段时间,并不能精确到纳秒,只会多1ms,如果在这一段时间内没哟被唤醒就超时返回
notify()        //唤醒一个在等待对象锁的其他线程,如果有多个线程在等待该对象锁,那么会由线程规划器随机唤醒一个线程,此方法也必须在同步方法或同步块中使用,否则会抛出IllegalMonitorStateException异常。
notifyAll()     //唤醒所有等待此对象锁的线程 

注意
(1)notify()或notifyAll()在调用之后,等待线程不会立即从WAITING状态立即变为RUNNING状态,而是需要等到调动notify()或notifyAll()的方法释放对象锁之后才会从WAITING状态返回。
(2)wait(long,int) 这个方法其实不能精确到ns,这一点从源码就可以看到,他只是在前面的参数上加了1ms:

二、等待/通知机制的经典范式(模板)

(1)等待方(消费者)需遵循如下原则:

  • 获取对象锁
  • 如果条件不满足,那么调用对象的wait()方法,被通知后仍然要检查条件
  • 条件满足则执行对应逻辑
synchronized(对象){
   while(条件不满足){
       对象.wait();
   } 
   对应的逻辑;
}    

(2)通知方(生产者)需遵循如下原则:

  • 获得对象锁
  • 改变条件
  • 通知所有等待该对象锁的线程
synchronized(对象){
    改变条件;
    对象.notifyAll();
}

三、异步模型——生产者和消费者模型

要点:

  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者专心生产资源,不关心数据如何处理,消费者专心消费资源
  • 消息队列是由容量限制的,当容量满了以后生产者停止生产,当空了后消费者停止消费
  • JDK中各种阻塞队列使用的就是这种模式

实现生产者消费者

资源StockBlockQueue.java

public class StockBlockQueue implements Stock {

    //缓冲区
    private BlockingQueue<String> queue = new ArrayBlockingQueue<>(MAX_SIZE);

    @Override
    public String take() {
        String stock = null;
        try {
            //从阻塞队列中阻塞的获取数据
            stock = queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return stock;
    }

    @Override
    public void put(String stock) {
        try {
            queue.put(stock);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Stock接口

public interface Stock {

    int MAX_SIZE=100;

    //获取商品
    String take();

    //放入商品
    void put(String stack);
}

生产者Producer.java

public class Producer implements Runnable {

    //货物,生产者就生产货物
    private final Stock stock;
    //统计生产的数量
    private volatile AtomicInteger tootleNum = new AtomicInteger(0);
    //生产的目标值
    private int targetNum;

    public Producer(Stock stock, int targetNum) {
        this.stock = stock;
        this.targetNum = targetNum;
    }

    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        System.out.println(currentThread.getName() + "开始生产!");
        try {
            while (!currentThread.isInterrupted()) {
                Thread.sleep(new Random().nextInt(3000));
                //UUID随机字串表示生产的货物
                String good = UUID.randomUUID().toString();
                //将货物丢进阻塞队列
                stock.put(good);
                int tootle = tootleNum.getAndIncrement();
                if (tootle > targetNum) {
                    //达到生产额度之后停止生产
                    Thread.currentThread().interrupt();
                }
                System.out.println(currentThread.getName() + "生产数据:" + good + ",累计生产" + tootle);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            //恢复中断标记位
            currentThread.interrupt();
        }
    }
}

消费者Customer.java

public class Customer implements Runnable {

    //货物,消费者消费货物
    private final Stock stock;

    public Customer(Stock stock) {
        this.stock = stock;
    }
	
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        System.out.println(currentThread.getName() + "开始消费!");
        try {
            while (!currentThread.isInterrupted()) {
                String good = stock.take();
                Thread.sleep(new Random().nextInt(5000));
                System.out.println(currentThread.getName() + "消费数据:"+good);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            //在sleep的时候调用interrupt()会产生异常之后会破坏中断标记位,
            // 这里需要恢复中断标记位
            currentThread.interrupt();
        }
    }

}

其实生产者消费者模式的实现方式还有很多种,这里就展示了一种基于阻塞队列实现的生产者消费者模式的方式,其他几种实现方式可以参考我的另一篇文章生产者消费者模式

四、最后总结一下

1、等待/通知机制

Java中这一机制的实现是wait()/notify()。

  • wait():会将当前线程放入等待队列,让当前线程停止运行直到有其他线程唤醒或者被中断。在调用wait()方法时,当前线程必须已经获得锁,即只能与synchroinzed中使用,执行wait()方法后当前线程会释放锁。

  • notify():只能和synchronized配合使用,当此方法执行后会有由线程规划器随机唤醒一个等待中的线程,执行notify()之后,当前线程不会立即释放锁,被唤醒的线程也不会立即获得该对象锁,而是进入到就绪状态(进入到Monitor的EentryList中)准备竞争此对象锁,如果竞争锁失败,此线程也除非再次代用wait(),否者不会再被放到等待队列中了。

2、sleep()和wait()的区别

(1)sleep()是Thread类中的方法,wait()是Object()中的方法,因此所有对象都有这个方法
(2)wait()必须配合synchronized使用,但是sleep()没有这个要求
(3)如果当前线程持有锁,那么调用sleep()方法之后当前线程不会释放锁,此时其他线程四无法获取这个锁的,但是调用wait()方法会释放锁,其他线程可获得该对象锁

3、Java中wait()和notify为什么定义在Object类中而不是在Thread类中?

Object中的wait(), notify()等函数,和synchronized一样,会对“对象的同步锁”进行操作。

wait()会使“当前线程”等待,因为线程进入等待状态,所以线程应该释放它锁持有的“同步锁”,否则其它线程获取不到该“同步锁”而无法运行!
OK,线程调用wait()之后,会释放它锁持有的“同步锁”;而且,根据前面的介绍,我们知道:等待线程可以被notify()或notifyAll()唤醒。现在,请思考一个问题:notify()是依据什么唤醒等待线程的?或者说,wait()等待线程和notify()之间是通过什么关联起来的?答案是:依据“对象的同步锁”。

负责唤醒等待线程的那个线程(我们称为“唤醒线程”),它只有在获取“该对象的同步锁”(这里的同步锁必须和等待线程的同步锁是同一个),并且调用notify()或notifyAll()方法之后,才能唤醒等待线程。虽然,等待线程被唤醒;但是,它不能立刻执行,因为唤醒线程还持有“该对象的同步锁”。必须等到唤醒线程释放了“对象的同步锁”之后,等待线程才能获取到“对象的同步锁”进而继续运行。

总之,notify(), wait()依赖于“同步锁”,而“同步锁”是对象锁持有,并且每个对象有且仅有一个!这就是为什么notify(), wait()等函数定义在Object类,而不是Thread类中的原因。

留言区

还能输入500个字符