Java SynchronousQueue的指南
1.概述
在这篇文章中,我们将研究来自java.util.concurrent 包的SynchronousQueue。
简单地说,这种实现方式使我们能够以线程安全的方式在线程之间交换信息。
2.API概述
SynchronousQueue 仅有两个支持的操作。take() 和put(),而且这两个操作都是阻塞的。
例如,当我们想向队列添加一个元素时,我们需要调用put() 方法。该方法将被阻塞,直到其他某个线程调用take() 方法,表示它已经准备好获取一个元素。
尽管SynchronousQueue有一个队列的接口,但我们应该把它看作是两个线程之间的单个元素的交换点,其中一个线程正在传递一个元素,而另一个线程正在获取该元素。
3.使用共享变量实现切换
为了了解为什么SynchronousQueue 可以如此有用,我们将使用两个线程之间的共享变量实现一个逻辑,接下来,我们将使用SynchronousQueue重写该逻辑,使我们的代码更简单,更易读。
假设我们有两个线程 – 一个生产者和一个消费者 – 当生产者在设置一个共享变量的值时,我们要向消费者线程发出信号。接下来,消费者线程将从共享变量中获取一个值。
我们将使用CountDownLatch来协调这两个线程,以防止出现消费者访问一个尚未设置的共享变量的值的情况。
我们将定义一个共享状态变量和一个CountDownLatch,它将被用于协调处理。
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
生产者将保存一个随机的整数到sharedState 变量中,并在countDownLatch上执行countDown() 方法,向消费者发出信号,表示它可以从sharedState:中获取一个值。
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();
};
消费者将使用await() 方法来等待countDownLatch。当生产者发出信号说该变量已被设置,消费者将从sharedState:中获取它。
Runnable consumer = () -> {
try {
countDownLatch.await();
Integer consumedElement = sharedState.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
最后但并非最不重要的是,让我们开始我们的计划。
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);
它将产生以下输出结果。
Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point
我们可以看到,为了实现在两个线程之间交换一个元素这样一个简单的功能,需要很多代码。在下一节中,我们将尝试让它变得更好。
4.使用SynchronousQueue实现交接
现在让我们实现与上一节相同的功能,但使用SynchronousQueue。它有双重效果,因为我们可以用它来交换线程间的状态,并协调该动作,这样我们就不需要使用除SynchronousQueue.以外的任何东西了。
首先,我们将定义一个队列。
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
生产者将调用一个put()方法,该方法将被阻塞,直到其他线程从队列中获取一个元素。
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
消费者将简单地使用take()方法来检索该元素。
Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
下一步,我们将开始我们的程序。
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);
它将产生以下输出结果。
Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point
我们可以看到,一个同步队列被用作线程之间的交换点,这比之前的例子要好得多,也更容易理解,因为之前的例子将共享状态与CountDownLatch.一起使用。
5.总结
在这个快速教程中,我们看了SynchronousQueue结构。我们创建了一个使用共享状态在两个线程之间交换数据的程序,然后重写了该程序以利用SynchronousQueue 结构。这作为一个交换点,协调了生产者和消费者线程。
所有这些例子和代码片段的实现都可以在GitHub项目中找到--这是一个Maven项目,所以应该很容易导入和运行,就像现在一样。