java.util.concurrent.BlockingQueue的指南
1.概述
在这篇文章中,我们将看一下解决并发生产者-消费者问题的最有用的结构之一java.util.concurrent。我们将看看BlockingQueue接口的一个API,以及该接口的方法如何使编写并发程序变得更容易。
在文章的后面,我们将展示一个简单程序的例子,该程序有多个生产者线程和多个消费者线程。
2.BlockingQueue类型
我们可以区分两种类型的BlockingQueue。
- 无界队列 – 几乎可以无限地增长
- 有界队列 – 定义了最大容量
2.1.无界队列
创建无界队列是很简单的。
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
blockingQueue 的容量将被设置为Integer.MAX_VALUE。所有向无界队列添加元素的操作都不会阻塞,因此它可能会增长到一个非常大的尺寸。
使用无界 BlockingQueue 设计生产者-消费者程序时,最重要的是消费者应该能够在生产者向队列中添加消息时尽快消费消息。否则,内存会被填满,我们会得到一个OutOfMemory 的异常。
2.2.有边界的队列
第二种类型的队列是有界队列。我们可以通过将容量作为参数传递给构造函数来创建这种队列。
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
这里我们有一个blockingQueue ,它的容量等于10。这意味着,当生产者试图将一个元素添加到一个已经满了的队列中时,根据用来添加它的方法(offer(), add() 或 put()),它将阻塞,直到插入对象的空间变得可用。否则,这些操作将失败。
使用有界队列是设计并发程序的好方法,因为当我们向一个已经满了的队列插入一个元素时,该操作需要等待,直到消费者赶上并在队列中腾出一些空间。它为我们提供了节流,而不需要我们做任何努力。
3. 阻塞队列 API
BlockingQueue interface 接口中有两种类型的方法 – 负责将元素添加到队列的方法和检索这些元素的方法。在队列满/空的情况下,这两类方法的行为是不同的。
3.1.添加元素
- add() – 如果插入成功则返回 true,否则抛出 IllegalStateException
- put() – 将指定元素插入队列,必要时等待空闲槽
- offer() – 返回 true 如果插入成功,否则false
- offer(E e, long timeout, TimeUnit unit) – 尝试将元素插入队列中,并在指定的超时时间内等待可用槽。
3.2.检索元素
- take() – 等待队列的头部元素并将其移除。如果队列是空的,它就阻塞并等待一个元素的出现。
- poll(long timeout, TimeUnit unit) – 检索并移除队列的头部,如有必要,等待指定的等待时间以使元素可用。 超时后返回null。
在构建生产者-消费者程序时,这些方法是来自BlockingQueen接口中最重要的构建块。
4.多线程的生产者-消费者实例
让我们创建一个由两部分组成的程序–一个生产者和一个消费者。
生产者将产生一个从0到100的随机数,并将该数字放入一个BlockingQueue。我们将有4个生产者线程,并使用put() 方法来阻塞,直到队列中有可用空间。
需要记住的是,我们需要阻止我们的消费者线程无限期地等待一个元素出现在队列中。
从生产者到消费者发出信号说没有更多的消息要处理的一个好技术是发送一个特殊的消息,称为毒丸。我们需要发送尽可能多的毒丸,因为我们有消费者。然后,当一个消费者从队列中取出那个特殊的毒丸消息时,它将优雅地完成执行。
让我们来看看一个生产者类的情况。
public class NumbersProducer implements Runnable {
private BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
我们的生产者构造函数将BlockingQueue 作为参数,该参数用于协调生产者和消费者之间的处理。我们看到方法generateNumbers() 将把100个元素放入队列。它还需要毒丸消息,以知道当执行结束时,什么类型的消息必须被放入队列。该消息需要将 poisonPillPerProducer 次放入队列。
每个消费者将从BlockingQueue 使用take() 方法获取一个元素,所以它将阻塞,直到队列中有一个元素。在从队列中取出一个Integer 后,它会检查该消息是否是一个毒药,如果是,那么一个线程的执行就结束。否则,它将在标准输出端打印出结果和当前线程的名称。
这将使我们深入了解我们的消费者的内部运作。
public class NumbersConsumer implements Runnable {
private BlockingQueue<Integer> queue;
private final int poisonPill;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
System.out.println(Thread.currentThread().getName() + " result: " + number);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
需要注意的是队列的使用。和生产者构造函数一样,一个队列被作为一个参数传递。我们可以这样做,因为BlockingQueue 可以在线程之间共享而不需要任何显式同步。
现在我们有了生产者和消费者,我们可以开始我们的程序。我们需要定义队列的容量,我们把它设置为100个元素。
我们希望有 4 个生产者线程,而消费者线程的数量将等于可用处理器的数量:
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue 是使用有容量的构造创建的。我们要创建4个生产者和N个消费者。我们指定我们的毒丸消息为Integer.MAX_VALUE,因为在正常工作条件下,这样的值永远不会被我们的生产者发送。这里最重要的是,BlockingQueue 被用来协调它们之间的工作。
当我们运行该程序时,4个生产者线程将把随机的Integers 放入一个BlockingQueue ,消费者将从队列中获取这些元素。每个线程将把线程的名字和结果一起打印到标准输出。
5.总结
本文展示了BlockingQueue 的实际用途,并解释了用于添加和检索其中的元素的方法。此外,我们还展示了如何使用BlockingQueue 构建一个多线程的生产者-消费者程序,以协调生产者和消费者之间的工作。
所有这些例子和代码片段的实现都可以在GitHub项目中找到--这是一个基于Maven的项目,所以应该很容易按原样导入和运行。