DelayQueue的指南

评论 0 浏览 0 2017-05-14

1.概述

在这篇文章中,我们将研究来自java.util.concurrent包的DelayQueue 构造。这是一个阻塞队列,可用于生产者-消费者程序中。

它有一个非常有用的特性——当消费者想要从队列中取出一个元素时,他们只能在该特定元素的延迟到期时才取出它。

2.为 DelayQueue 中的元素实现延迟

我们要放入 DelayQueue 的每个元素都需要实现 Delayed 接口。假设我们要创建一个 DelayObject 类。该类的实例将放入DelayQueue。

我们将把String数据和delayInMilliseconds作为和参数传递给它的构造函数。

public class DelayObject implements Delayed {
    private String data;
    private long startTime;

    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }

我们正在定义一个 startTime – 这是该元素应该从队列中被消耗的时间。接下来,我们需要实现getDelay() 方法 – 它应该返回在给定时间单位内与此对象相关的剩余延迟。

因此,我们需要使用TimeUnit.convert() 方法来返回适当的时间单位中的剩余延迟:

@Override
public long getDelay(TimeUnit unit) {
    long diff = startTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
}

当消费者试图从队列中获取一个元素时,DelayQueue 将执行getDelay() 以找出该元素是否被允许从队列中返回。如果getDelay() 方法将返回0或负数,这意味着它可以从队列中取回。

我们还需要实现compareTo() 方法,因为DelayQueue中的元素将根据过期时间进行排序。将首先过期的项目被保存在队列的头部,过期时间最长的元素被保存在队列的尾部。

@Override
public int compareTo(Delayed o) {
    return Ints.saturatedCast(
      this.startTime - ((DelayObject) o).startTime);
}

3. DelayQueue 消费者和生产者

为了能够测试我们的DelayQueue,我们需要实现生产者和消费者逻辑。生产者类将队列、要生产的元素数量以及每个消息的延迟(以毫秒为单位)作为参数。

然后,当run()方法被调用时,它将元素放入队列中,并在每次放完后睡眠500毫秒。

public class DelayQueueProducer implements Runnable {
 
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;

    // standard constructor

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object
              = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

消费者的实现非常相似,但它也记录了被消费的消息的数量。

public class DelayQueueConsumer implements Runnable {
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();

    // standard constructors

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

4.DelayQueue 使用测试

为了测试 DelayQueue,我们将创建一个生产者线程和一个消费者线程。

生产者将put() 两个对象放到队列中,延迟500毫秒。该测试断言消费者消费了两条消息:

@Test
public void givenDelayQueue_whenProduceElement
  _thenShouldConsumeAfterGivenDelay() throws InterruptedException {
    // given
    ExecutorService executor = Executors.newFixedThreadPool(2);
    
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(
      queue, numberOfElementsToProduce);
    DelayQueueProducer producer = new DelayQueueProducer(
      queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

    // when
    executor.submit(producer);
    executor.submit(consumer);

    // then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
 
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}

我们可以观察到,运行这个程序将产生以下输出:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

生产者放置对象,一段时间后,第一个延迟过期的对象被消耗掉。

第二个要素也出现了同样的情况。

5.消费者无法在规定时间内消费

假设我们有一个生产者,正在生产一个将在10秒内过期的元素

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
  queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

我们将开始我们的测试,但它将在5秒后终止。由于 DelayQueue的特性,消费者将不能从队列中消费消息,因为该元素还没有过期。

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);

请注意,消费者的numberOfConsumedElements的值等于零。

6.生产一个立即失效的元素

Delayed message getDelay()方法的实现返回一个负数,这意味着给定的元素已经过期。在这种情况下,生产者将立即消耗该元素。

我们可以测试一下生产一个具有负延迟的元素的情况。

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

当我们启动测试用例时,消费者将立即消费该元素,因为它已经过期了。

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);

7.结语

在这篇文章中,我们看的是来自java.util.concurrent包的DelayQueue 构造。

我们实现了一个延迟元素,该元素从队列中产生和消费。

我们利用我们对DelayQueue的实现来消耗已经过期的元素。

所有这些例子和代码片段的实现都可以在GitHub项目中找到--这是一个Maven项目,所以应该很容易导入和运行,因为它是一个Maven项目。

最后更新2023-02-28
0 个评论
标签