Java TransferQueue的指南
1.概述
在这篇文章中,我们将看看来自标准java.util.concurrent包的TransferQueue 构造。
简单地说,这个队列允许我们按照生产者-消费者模式创建程序,并协调从生产者到消费者之间传递的消息。
该实现实际上与BlockingQueue – 类似,但给我们提供了实现一种背压的新能力。这意味着,当生产者使用transfer()方法向消费者发送消息时,生产者将保持阻塞状态,直到消息被消费。
2.一个生产者 – 零个消费者
让我们测试一下来自TransferQueue –的transfer()方法;预期的行为是生产者将被阻塞,直到消费者使用take()方法从队列中收到消息为止。
为了实现这一目标,我们将创建一个有一个生产者但没有消费者的程序。生产者线程对transfer() 的第一次调用将无限期地阻塞,因为我们没有任何消费者可以从队列中获取该元素。
让我们看看Producer 类是怎样的吧。
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer numberOfMessagesToProduce;
public AtomicInteger numberOfProducedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
boolean added
= transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if(added){
numberOfProducedMessages.incrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// standard constructors
}
我们将一个TransferQueue的实例和一个我们想给生产者的名字以及应该被转移到队列中的元素的数量一起传递给构造函数。
请注意,我们正在使用tryTransfer()方法,有一个给定的超时。我们在等待四秒,如果生产者在给定的超时时间内不能传输消息,它就会返回false,并继续处理下一个消息。生产者有一个numberOfProducedMessages 变量来记录生产了多少消息。
接下来,让我们看一下Consumer类。
class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int numberOfMessagesToConsume;
public AtomicInteger numberOfConsumedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
String element = transferQueue.take();
longProcessing(element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element)
throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
// standard constructors
}
它与生产者类似,但我们通过使用take() 方法从队列中接收元素。我们还通过使用longProcessing() 方法来模拟一些长期运行的动作,其中我们正在增加numberOfConsumedMessages 变量,该变量是收到的消息的一个计数器。
现在,让我们只用一个生产者来开始我们的程序。
@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
// when
exService.execute(producer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}
我们想向队列发送三个元素,但是生产者在第一个元素上被阻塞了,而且没有消费者从队列中获取该元素.我们正在使用tryTransfer() 方法 ,它将阻塞直到消息被消耗或达到超时。超时后,它将返回false,表示传输失败,它将尝试传输下一条。这就是前面例子的输出。
Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...
3.一个生产者 – 一个消费者
让我们来测试一下有一个生产者和一个消费者的情况。
@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
Consumer consumer = new Consumer(transferQueue, "1", 3);
// when
exService.execute(producer);
exService.execute(consumer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}
TransferQueue被用作交换点,在消费者从队列中消耗一个元素之前,生产者不能继续向其添加另一个元素。我们来看看程序的输出。
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2
我们看到,由于TransferQueue的规范,从队列中生产和消费元素是有顺序的。
4.许多生产者 – 许多消费者
在最后一个例子中,我们将考虑有多个消费者和多个生产者。
@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(3);
Producer producer1 = new Producer(transferQueue, "1", 3);
Producer producer2 = new Producer(transferQueue, "2", 3);
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
// when
exService.execute(producer1);
exService.execute(producer2);
exService.execute(consumer1);
exService.execute(consumer2);
// then
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}
在这个例子中,我们有两个消费者和两个生产者。当程序开始时,我们看到两个生产者可以生产一个元素,之后,他们将阻塞,直到其中一个消费者从队列中获取该元素。
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2
5.总结
在这篇文章中,我们看的是来自java.util.concurrent包的TransferQueue 构造。
我们看到了如何使用该结构来实现生产者-消费者程序。我们使用transfer() 方法来创建一种背压形式,在消费者从队列中检索到一个元素之前,生产者不能发布另一个元素。
TransferQueue 在我们不希望过度生产的生产者将消息淹没队列时非常有用,从而导致 OutOfMemory 错误。在这样的设计中,消费者将决定生产者生产消息的速度。
所有这些例子和代码片段都可以在GitHub上找到--这是一个Maven项目,所以应该很容易导入和运行,就像现在这样。