DelayQueue的指南
1.概述
在这篇文章中,我们将研究来自java.util.concurrent包的DelayQueue 构造。这是一个阻塞队列,可用于生产者-消费者程序中。
它有一个非常有用的特性——当消费者想要从队列中取出一个元素时,他们只能在该特定元素的延迟到期时才取出它。
2.为 DelayQueue 中的元素实现延迟
我们要放入 DelayQueue 的每个元素都需要实现 Delayed 接口。假设我们要创建一个 DelayObject 类。该类的实例将放入DelayQueue。
我们将把String数据和delayInMilliseconds作为和参数传递给它的构造函数。
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
我们正在定义一个 startTime – 这是该元素应该从队列中被消耗的时间。接下来,我们需要实现getDelay() 方法 – 它应该返回在给定时间单位内与此对象相关的剩余延迟。
因此,我们需要使用TimeUnit.convert() 方法来返回适当的时间单位中的剩余延迟:。
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
当消费者试图从队列中获取一个元素时,DelayQueue 将执行getDelay() 以找出该元素是否被允许从队列中返回。如果getDelay() 方法将返回0或负数,这意味着它可以从队列中取回。
我们还需要实现compareTo() 方法,因为DelayQueue中的元素将根据过期时间进行排序。将首先过期的项目被保存在队列的头部,过期时间最长的元素被保存在队列的尾部。
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3. DelayQueue 消费者和生产者
为了能够测试我们的DelayQueue,我们需要实现生产者和消费者逻辑。生产者类将队列、要生产的元素数量以及每个消息的延迟(以毫秒为单位)作为参数。
然后,当run()方法被调用时,它将元素放入队列中,并在每次放完后睡眠500毫秒。
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
消费者的实现非常相似,但它也记录了被消费的消息的数量。
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4.DelayQueue 使用测试
为了测试 DelayQueue,我们将创建一个生产者线程和一个消费者线程。
生产者将put() 两个对象放到队列中,延迟500毫秒。该测试断言消费者消费了两条消息:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
我们可以观察到,运行这个程序将产生以下输出:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
生产者放置对象,一段时间后,第一个延迟过期的对象被消耗掉。
第二个要素也出现了同样的情况。
5.消费者无法在规定时间内消费
假设我们有一个生产者,正在生产一个将在10秒内过期的元素。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
我们将开始我们的测试,但它将在5秒后终止。由于 DelayQueue的特性,消费者将不能从队列中消费消息,因为该元素还没有过期。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
请注意,消费者的numberOfConsumedElements的值等于零。
6.生产一个立即失效的元素
当Delayed message getDelay()方法的实现返回一个负数,这意味着给定的元素已经过期。在这种情况下,生产者将立即消耗该元素。
我们可以测试一下生产一个具有负延迟的元素的情况。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
当我们启动测试用例时,消费者将立即消费该元素,因为它已经过期了。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7.结语
在这篇文章中,我们看的是来自java.util.concurrent包的DelayQueue 构造。
我们实现了一个延迟元素,该元素从队列中产生和消费。
我们利用我们对DelayQueue的实现来消耗已经过期的元素。
所有这些例子和代码片段的实现都可以在GitHub项目中找到--这是一个Maven项目,所以应该很容易导入和运行,因为它是一个Maven项目。