高并发编程之等待/通知机制

一、什么是等待/通知机制?

多个线程之间也可以实现通信,原因就是多个线程共同刚 访问同一个变量。但是这种通信机制不是 “等待/通知”,两个线程完全是主动地读取一个共享变量。简单的说,等待/通知机制就是一个【线程A】等待,一个【线程B】通知(线程A可以不用再等待了)。比如生产者和消费者模型,消费者等待生产者生产资源,这是等待,生产者生产好资源通知等待的消费者去消费,这是通知。

等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在java.lang.Object类中

wait()     //代用该方法的线程会进入到WAITING状态,并且当前线程会被放置到等待队列中,只有等待另外线程的唤醒或被中断返回,调用wait()方法后线程会释放对象锁,同时该方法必须在同步方法或同步块中使用,否则会抛出IllegalMonitorStateException异常。
wait(long)     //超时等待一段时间,时间单位是ms,如果没有被唤醒就超时返回
wait(long,int)   //等待一段时间,并不能精确到纳秒,只会多1ms,如果在这一段时间内没哟被唤醒就超时返回
notify()        //唤醒一个在等待对象锁的其他线程,如果有多个线程在等待该对象锁,那么会由线程规划器随机唤醒一个线程,此方法也必须在同步方法或同步块中使用,否则会抛出IllegalMonitorStateException异常。
notifyAll()     //唤醒所有等待此对象锁的线程 

注意
(1)notify()或notifyAll()在调用之后,等待线程不会立即从WAITING状态立即变为RUNNING状态,而是需要等到调动notify()或notifyAll()的方法释放对象锁之后才会从WAITING状态返回。
(2)wait(long,int) 这个方法其实不能精确到ns,这一点从源码就可以看到,他只是在前面的参数上加了1ms:

二、等待/通知机制的经典范式(模板)

(1)等待方(消费者)需遵循如下原则:

  • 获取对象锁
  • 如果条件不满足,那么调用对象的wait()方法,被通知后仍然要检查条件
  • 条件满足则执行对应逻辑
synchronized(对象){
   while(条件不满足){
       对象.wait();
   } 
   对应的逻辑;
}    

(2)通知方(生产者)需遵循如下原则:

  • 获得对象锁
  • 改变条件
  • 通知所有等待该对象锁的线程
synchronized(对象){
    改变条件;
    对象.notifyAll();
}

三、异步模型——生产者和消费者模型

要点:

  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者专心生产资源,不关心数据如何处理,消费者专心消费资源
  • 消息队列是由容量限制的,当容量满了以后生产者停止生产,当空了后消费者停止消费
  • JDK中各种阻塞队列使用的就是这种模式

实现生产者消费者

资源Resourcs.java

package top.easyblog.wait;

/**
 * 生产者——消费者模型:资源
 *
 * @author :huangxin
 * @modified :
 * @since :2020/04/01 18:21
 */
public class Resources {

    //模拟资源的数量
    private int num;

    //资源的最大允许数存放量
    private static final int MAX_NUM = 10;


    /**
     * 从资源池中取走资源
     */
    public synchronized void remove() {
        if (num > 0) {
            num--;
            System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源有" + num + "个");
            //通知生产者生产资源
            notifyAll();
        } else {
            try {
                //没有资源,消费者进入等待状态
                wait();
                System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 向资源池中添加资源
     */
    public synchronized void add() {
        if (num < MAX_NUM) {
            num++;
            System.out.println("生产者" + Thread.currentThread().getName() + "+生产一个资源,当前资源有" + num + "个");
            //通知消费者消费
            notifyAll();
        } else {
            try {
                //资源生产数量过多,让生产者进入等待状态,并释放锁
                wait();
                System.out.println(Thread.currentThread().getName() + "线程进入等待");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

生产者Producer.java

package top.easyblog.wait;

/**
 * 生产者——消费者模型:生产者
 *
 * @author :huangxin
 * @modified :
 * @since :2020/04/01 18:11
 */
public class Producer implements Runnable {

    private Resources resources;

    public Producer(Resources resources) {
        this.resources = resources;
    }


    /**
     * 生产者每1s生产一个资源
     */
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resources.add();
        }
    }
}

消费者Customer.java

package top.easyblog.wait;

/**
 * 生产者——消费者模型:消费者
 *
 * @author :huangxin
 * @modified :
 * @since :2020/04/01 18:34
 */
public class Customer implements Runnable {

    private Resources resources;

    public Customer(Resources resources) {
        this.resources = resources;
    }

    /**
     * 消费者每2s消费一个资源
     */
    public void run() {
        while (true) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resources.remove();
        }
    }
}

四、最后总结一下

1、等待/通知机制

Java中这一机制的实现是wait()/notify()。

  • wait():会将当前线程放入等待队列,让当前线程停止运行直到有其他线程唤醒或者被中断。在调用wait()方法时,当前线程必须已经获得锁,即只能与synchroinzed中使用,执行wait()方法后当前线程会释放锁。

  • notify():只能和synchronized配合使用,当此方法执行后会有由线程规划器随机唤醒一个等待中的线程,执行notify()之后,当前线程不会立即释放锁,被唤醒的线程也不会立即获得该对象锁,而是进入到就绪状态(进入到Monitor的EentryList中)准备竞争此对象锁,如果竞争锁失败,此线程也除非再次代用wait(),否者不会再被放到等待队列中了。

2、sleep()和wait()的区别

(1)sleep()是Thread类中的方法,wait()是Object()中的方法,因此所有对象都有这个方法
(2)wait()必须配合synchronized使用,但是sleep()没有这个要求
(3)如果当前线程持有锁,那么调用sleep()方法之后当前线程不会释放锁,此时其他线程四无法获取这个锁的,但是调用wait()方法会释放锁,其他线程可获得该对象锁

3、Java中wait()和notify为什么定义在Object类中而不是在Thread类中?

Object中的wait(), notify()等函数,和synchronized一样,会对“对象的同步锁”进行操作。

wait()会使“当前线程”等待,因为线程进入等待状态,所以线程应该释放它锁持有的“同步锁”,否则其它线程获取不到该“同步锁”而无法运行!
OK,线程调用wait()之后,会释放它锁持有的“同步锁”;而且,根据前面的介绍,我们知道:等待线程可以被notify()或notifyAll()唤醒。现在,请思考一个问题:notify()是依据什么唤醒等待线程的?或者说,wait()等待线程和notify()之间是通过什么关联起来的?答案是:依据“对象的同步锁”。

负责唤醒等待线程的那个线程(我们称为“唤醒线程”),它只有在获取“该对象的同步锁”(这里的同步锁必须和等待线程的同步锁是同一个),并且调用notify()或notifyAll()方法之后,才能唤醒等待线程。虽然,等待线程被唤醒;但是,它不能立刻执行,因为唤醒线程还持有“该对象的同步锁”。必须等到唤醒线程释放了“对象的同步锁”之后,等待线程才能获取到“对象的同步锁”进而继续运行。

总之,notify(), wait()依赖于“同步锁”,而“同步锁”是对象锁持有,并且每个对象有且仅有一个!这就是为什么notify(), wait()等函数定义在Object类,而不是Thread类中的原因。

留言区

还能输入500个字符