队列是一种访问受限的线性数据结构,它有两个基本操作:在队列尾部加入元素和从队列头部移除元素。在我们日常开发中,经常用来并发操作数据。java中提供了一些应用比较广泛的特殊队列:一种是以ConcurrentLinkedQueue为代表的非阻塞队列;另一种是以BlockingQueue接口为代表的阻塞队列。

一、初识阻塞队列
在J.U.C包中提供的BlockingQueue很好的解决了多线程中如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
BlockingQueue核心方法:
public interface BlockingQueue<E> extends Queue<E> {
//将给定元素添加到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);
//将给定的元素添加到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);
//将元素添加到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;
//将给定元素在给定的时间内添加到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;
//在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列剩余的空间。
int remainingCapacity();
//从队列中移除指定的值。
boolean remove(Object o);
//判断队列中是否拥有该值。
public boolean contains(Object o);
//将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);
//指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}
阻塞队列的线程安全是依赖ReentrantLock实现的,并且借助于Condition中的await()和signal()\signal()方法实现对线程的阻塞和唤醒。
二、阻塞队列成员详细介绍
1、ArrayListBlockingQueue
ArrayListBlockingQueue是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁(默认情况下是非公平锁)。
public ArrayBlockingQueue(int capacity) {
//默认使用的是非公平锁
this(capacity, false);
}
//通过boolean类型的参数可以控制使用公平锁还是非公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2、LinkedBlockingQueue
LinkedBlockingQueue是一个用单链表实现的有界阻塞队列。此队列的默认长度和最大长度为Integer.MAX_VALUE
。此队列按照先进先出的原则对元素进行排序。
3、PriorityBlockingQueue
一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
4、DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列基于PriorityBlockingQueue实现。队列中的元素必须实现Delayed接口,在创建元素是可以指定多久才能从队列中获取当前元素。DelayQueue可以用于 缓存系统设计 和 定时任务调度 这样的应用场景。
5、SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公共策略访问队列。当使用公平锁的时候,等待的线程会采用先进先出的顺序访问队列。
//默认使用非公平锁
public SynchronousQueue() {
this(false);
}
//通过boolean类型的参数可以控制使用公平锁还是非公平锁
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
6、LinkedTransferQueue
LinkedTransferQueue是一个由链表实现的无界阻塞Transfer队列。相对于其他阻塞队列,LinkedTransferQueue多了transfer和tryTransfer方法
7、LinkedBlockingDeque
LinkedBlockingDeque是一个由双链表实现的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
接下来重点介绍下:ArrayListBlockingQueue、LinkedBlockingQueue以及DelayQueue
三、阻塞队列原理及应用
阻塞队列的原理就是基于等待/通知机制
实现的,即当生产者往满的队列里添加元素的时候会阻塞生产者,当消费者消费了队列中的一个元素后户通知生产者生产。只不过在阻塞队列中没有使用Obeject类中的wait()和notify(),而是使用了Conditon接口下的await()和signal()方法来实现了对某个具体的线程的挂起和唤醒,底层调用的是LockSupport类的静态方法prak()和unpark()方法实现的。其中Condition接口的实例是通过ReentrantLock的newCondition()
方法得到的,实际的实现是在AQS中,接下来我们就以ArrayListBlockingQueue、LinkedBlockingQueue以及DelayQueue为重点来分析一下他们是如何实现的。
1、Conditon接口在AQS中的实现
在AQS中有一个内部类ConditonObject,它实现了Condition接口。我们主要就来看看await()和signal()方法的实现:
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
}
(1)await()方法—用于阻塞当前线程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//调用了LockSupport.park(Thread)方法来阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
(2)signal()方法—用于唤醒处在AQS等待队列头部的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取头部结点 ,结点中封装了等待中的线程
Node first = firstWaiter;
if (first != null)
//如果结点不为null,那就唤醒结点中处于等待中的线程
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//最终还是调用了LockSupport.unpark(Tread)唤醒线程
LockSupport.unpark(node.thread);
return true;
}
2、ArrayListBlockingQueue
(1)参数以及构造方法:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/** 拿数据的索引,在take()/poll()/peek()/remove()方法中使用*/
int takeIndex;
/** 添加数据的索引,在put()/offer()/add()方法中使用*/
int putIndex;
/** 元素个数 */
int count;
/** 可重入锁*/
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
//默认构造非公平的阻塞队列
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化数组,容量是capacity
this.items = new Object[capacity];
//锁,传入false就是非公平锁;传入true就是公平锁
lock = new ReentrantLock(fair);
//初始化非空等待队列
notEmpty = lock.newCondition();
//出初始化非满等待队列
notFull = lock.newCondition();
}
}
(2)添加元素的原理—add/offer/put
add方法最终调用的还是offer方法,offer方法最终调用了enqueue(E x)方法。enqueue(E x)方法内部通过putIndex索引直接将元素添加到数组items中,这里可能会疑惑的是当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0,这是因为当前队列执行元素获取时总是从队列头部获取,而添加元素从中从队列尾部获取所以当队列索引(从0开始)与数组长度相等时,下次我们就需要从数组头部开始添加了
//offer方法的实现
public boolean offer(E e) {
checkNotNull(e);
//首先获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列满了之后就返回false
if (count == items.length)
return false;
else {
//队列还有空间就调用enqueue()方法添加
enqueue(e);
return true;
}
} finally {
//解锁
lock.unlock();
}
}
//enqueue方法的实现——入队操作
private void enqueue(E x) {
final Object[] items = this.items;
//putIndex索引就是下一元素应该放入的索引位置,这里直接把元素放入数组putIndex处
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
//容量自加
count++;
//唤醒在等待取元素的线程
notEmpty.signal();
}
接下来我们看一下put方法的实现:
put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。
//put方法的实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
//获取锁,可以响应中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);
} finally {
//解锁
lock.unlock();
}
}
offer,add在正常情况下都是无阻塞的添加,而put方法是阻塞添加。这就是阻塞队列的添加过程。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示:

poll方法,该方法获取并移除此队列的头元素,若队列为空,则返回 null
public E poll() {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//非阻塞获取队列头部元素,如果队列是空的就返回null
return (count == 0) ? null : dequeue();
} finally {
//解锁
lock.unlock();
}
}
//出队操作
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //由于是数组,可以直接通过takeIndex索引获取数据
items[takeIndex] = null; //表示把元素移除了
//takeIndex索引自加,如果takeIndex跑到数组尾部了,就初始成0
if (++takeIndex == items.length)
takeIndex = 0;
count--; //count初始为0
if (itrs != null)
//更新迭代器中的元素数据
itrs.elementDequeued();
notFull.signal(); //把添加元素的线程唤醒
return x; //返回元素
}
接下来在看看remove方法,它只会移除元素而不会返回元素,移除成功返回true,否则返回false。
//移除队列中的元素,但不返回该元素
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列不为空
if (count > 0) {
//获取putIndex
final int putIndex = this.putIndex;
//获取当前要被删除元素的索引
int i = takeIndex;
//执行循环查找要删除的元素
do {
//找到要删除的元素
if (o.equals(items[i])) {
removeAt(i); //删除索引i位置的元素
return true;
}
//当前删除索引执行加1后判断是否与数组长度相等
//若为true,说明索引已到数组尽头,将i设置为0
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
//移除数组中指定索引位置的元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//移除时候唤醒等待添加元素的线程
notFull.signal();
}
3、LinkedBlockingQueue
(1)重要参数和构造方法
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//链表结点类型:单链表
static class Node<E> {
E item; //元素
Node<E> next; //下一个结点的引用
Node(E x) { item = x; }
}
//队列的容量大小
private final int capacity;
/** 统计元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger保证原子性*/
private final AtomicInteger count = new AtomicInteger();
/**链表头结点*/
transient Node<E> head;
/**链表尾结点*/
private transient Node<E> last;
/**获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行移除元素的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** notFull条件对象,当队列数据已满时用于挂起执行添加元素的线程 */
private final Condition notFull = putLock.newCondition();
//LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,因此在使用的时候一定要指定大小
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//构造指定大小的单链表
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
//此时链表的长度不能超过capacity
this.capacity = capacity;
last = head = new Node<E>(null);
}
}
(2)添加元素—offer/put
offer方法和put的差别就是offer方法是一个非阻塞的方法,如果添加成功返回true,添加失败了就返回false,不会阻塞的等待。take方法是一个阻塞的方法,如果添加失败了,会阻塞着等待。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//队列中元素个数
final AtomicInteger count = this.count;
//队列满了,没法添加了直接返回false,不会阻塞
if (count.get() == capacity)
return false;
//队列没有满,可以添加
int c = -1;
//new一个Node
Node<E> node = new Node<E>(e);
//获取加入元素的锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如果当前队列的元素个数小于指定的容量
if (count.get() < capacity) {
//入队新元素
enqueue(node);
//队列元素个数统计+1
c = count.getAndIncrement();
//如果添加新元素后队列大小小于指定容量就把其他执行添加元素的线程唤醒
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//逻辑大致和offer类似,只是当队列满了之后会阻塞当前线程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//对了满了,阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
//入队
private void enqueue(Node<E> node) {
//直接把新节点放到队列尾部即可
last = last.next = node;
}
(2)元素出队—poll()/take()
poll和take方法的区别主要还是前者在获取的时候如果发现队列是空的就会直接返回null不会阻塞,当时后者如果发现队列是空的就会阻塞当前线程。具体我们通过源码分析一下:
public E poll() {
//首先获取到当前队列的大小(元素个数)
final AtomicInteger count = this.count;
//如果队列是空的就直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//队列不为空
if (count.get() > 0) {
//从队列的头部获取到元素
x = dequeue();
//更新count
c = count.getAndDecrement();
if (c > 1)
//如果队列不空,唤醒其他等待获取元素的线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
*逻辑大致和poll方法类似,只是当队列为空之后会阻塞当前线程
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列空了,阻塞当前线程
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//弹出链表中的第一个元素
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
4、DelayQueue
DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。
Leader/Followers模式:
- 有若干个线程(一般组成线程池)用来处理大量的事
- 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠。
- 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件。
- 唤醒的追随者作为新的领导者等待事件的发生。
- 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者。
- 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。
所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
(1)参数以及构造方法
// 可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 存储队列元素的队列——优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//用于优化阻塞通知的线程元素leader,Leader/Followers模式
private Thread leader = null;
//用于实现阻塞和通知的Condition对象
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
(2)offer()方法
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 如果原来队列为空,重置leader线程,通知available条件
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
//因为DelayQueue不限制长度,因此添加元素的时候不会因为队列已满产生阻塞,因此带有超时的offer方法的超时设置是不起作用的
public boolean offer(E e, long timeout, TimeUnit unit) {
// 和不带timeout的offer方法一样
return offer(e);
}
普通的poll()方法:如果延迟时间没有耗尽的话,直接返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
take()方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 如果队列为空,需要等待available条件被通知
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 如果延迟时间已到,直接返回第一个元素
if (delay <= 0)
return q.poll();
// leader线程存在表示有其他线程在等待,那么当前线程肯定需要等待
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
// 如果没有leader线程,设置当前线程为leader线程
// 尝试等待直到延迟时间耗尽(可能提前返回,那么下次
// 循环会继续处理)
try {
available.awaitNanos(delay);
} finally {
// 如果leader线程还是当前线程,重置它用于下一次循环。
// 等待available条件时,锁可能被其他线程占用从而导致
// leader线程被改变,所以要检查
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果没有其他线程在等待,并且队列不为空,通知available条件
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
5、阻塞队列的应用
阻塞队列更据不同的类型适用的场景很多,比如ArrayListBlockingQueue、LinkedBlockingQueue、SynchronousQueue可以作为线程池的任务队列(线程池的任务队列应该尽量使用有界队列);还比如DelayQueue可以用于实现缓存系统设计和定时任务调度的场景.....;除过这些,还有其他生产者消费者的应用场景都可以使用合适的阻塞队列实现业务逻辑。