Java的ConcurrentMap 指南
一、概述
Maps 自然是使用最广泛的 Java 集合样式之一。
而且,重要的是,HashMap 不是线程安全的实现,而 Hashtable 确实通过以下方式提供线程安全同步操作。
尽管 Hashtable 是线程安全的,但效率不是很高。另一个完全同步的 Map, Collections.synchronizedMap,也没有表现出很高的效率。如果我们想要在高并发下具有高吞吐量的线程安全,这些实现不是可行的方法。
为了解决这个问题,Java 集合框架 在Java 1.5 中引入了ConcurrentMap。
以下讨论基于 Java 1.8。
2. ConcurrentMap
ConcurrentMap 是 Map 接口的扩展。它的目的是为解决吞吐量与线程安全之间的协调问题提供一个结构和指导。
通过覆盖多个接口默认方法,ConcurrentMap 为有效实现提供了指南,以提供线程安全和内存一致的原子操作。
多个默认实现被覆盖,禁用 null 键/值支持:
- getOrDefault
- forEach
- replaceAll
- computeIfAbsent
- computeIfPresent
- compute
- merge
以下 API 也被覆盖以支持原子性,没有默认接口实现:
- putIfAbsent
- remove
- replace(key, oldValue, newValue)
- replace(key, value)
其余动作直接继承,与Map基本一致。
3. ConcurrentHashMap
ConcurrentHashMap 是开箱即用的 ConcurrentMap 实现。
为了获得更好的性能,它在底层包含一个节点数组作为表桶(在 Java 8 之前用作表段),并且在更新期间主要使用 CAS 操作。
表桶在第一次插入时延迟初始化。每个桶都可以通过锁定桶中的第一个节点来独立锁定。读取操作不会阻塞,并且更新争用已降至最低。
所需的段数与访问表的线程数有关,因此大多数情况下每个段正在进行的更新不会超过一次。
在 Java 8 之前,所需的“段”数与访问表的线程数有关,因此大多数情况下每个段正在进行的更新不会超过一次。
这就是为什么与 HashMap 相比,构造函数提供了额外的 concurrencyLevel 参数来控制要使用的估计线程数:
public ConcurrentHashMap(
public ConcurrentHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel)
其他两个参数:initialCapacity 和 loadFactor 的工作方式与 HashMap 完全相同.
但是,自 Java 8 起,构造函数的存在只是为了向后兼容:参数只能影响地图的初始大小。
3.1.线程安全
ConcurrentMap 保证多线程环境中键/值操作的内存一致性。
在一个线程中,将一个对象作为键或值放入ConcurrentMap的操作,发生在另一个线程中访问或移除该对象的操作之前。
为了确认,让我们看一下内存不一致的情况:
@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
Map<String, Integer> map = new HashMap<>();
List<Integer> sumList = parallelSum100(map, 100);
assertNotEquals(1, sumList
.stream()
.distinct()
.count());
long wrongResultCount = sumList
.stream()
.filter(num -> num != 100)
.count();
assertTrue(wrongResultCount > 0);
}
private List<Integer> parallelSum100(Map<String, Integer> map,
int executionTimes) throws InterruptedException {
List<Integer> sumList = new ArrayList<>(1000);
for (int i = 0; i < executionTimes; i++) {
map.put("test", 0);
ExecutorService executorService =
Executors.newFixedThreadPool(4);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
for (int k = 0; k < 10; k++)
map.computeIfPresent(
"test",
(key, value) -> value + 1
);
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
sumList.add(map.get("test"));
}
return sumList;
}
对于并行执行的每个 map.computeIfPresent 操作,HashMap 不会提供关于当前整数值应该是什么的一致视图,从而导致不一致和不良结果。
至于ConcurrentHashMap,我们可以得到一致且正确的结果:
@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect()
throws Exception {
Map<String, Integer> map = new ConcurrentHashMap<>();
List<Integer> sumList = parallelSum100(map, 1000);
assertEquals(1, sumList
.stream()
.distinct()
.count());
long wrongResultCount = sumList
.stream()
.filter(num -> num != 100)
.count();
assertEquals(0, wrongResultCount);
}
3.2. Null 键/值
ConcurrentMap 提供的大多数 API 不允许 null 键或值,例如:
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
concurrentMap.put(null, new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
concurrentMap.put("test", null);
}
但是,对于compute* 和merge 操作,计算值可以是null,这表示键值映射如果存在则被删除,或者如果之前不存在则保持不存在。
@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
Object oldValue = new Object();
concurrentMap.put("test", oldValue);
concurrentMap.compute("test", (s, o) -> null);
assertNull(concurrentMap.get("test"));
}
3.3.流支持
Java 8 还在 ConcurrentHashMap 中提供了 Stream 支持。
与大多数流方法不同,批量(顺序和并行)操作允许安全地并发修改。 ConcurrentModificationException 不会被抛出,这也适用于它的迭代器。与流相关,还添加了几个 forEach*、search 和 reduce* 方法以支持更丰富的遍历和 map-reduce 操作。
3.4.性能
在底层,ConcurrentHashMap 与 HashMap 有点相似,数据访问和更新基于哈希表(虽然更复杂)。
当然,ConcurrentHashMap 在数据检索和更新的大多数并发情况下应该会产生更好的性能。
让我们为 get 和 put 性能编写一个快速的微型基准测试,并将其与 Hashtable 和 Collections.synchronizedMap 进行比较,在 4 个线程中运行这两个操作 500,000 次。
@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster()
throws Exception {
Map<String, Object> hashtable = new Hashtable<>();
Map<String, Object> synchronizedHashMap =
Collections.synchronizedMap(new HashMap<>());
Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
long syncHashMapAvgRuntime =
timeElapseForGetPut(synchronizedHashMap);
long concurrentHashMapAvgRuntime =
timeElapseForGetPut(concurrentHashMap);
assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}
private long timeElapseForGetPut(Map<String, Object> map)
throws InterruptedException {
ExecutorService executorService =
Executors.newFixedThreadPool(4);
long startTime = System.nanoTime();
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
for (int j = 0; j < 500_000; j++) {
int value = ThreadLocalRandom
.current()
.nextInt(10000);
String key = String.valueOf(value);
map.put(key, value);
map.get(key);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return (System.nanoTime() - startTime) / 500_000;
}
请记住,微基准仅关注单一场景,并不总是能很好地反映现实世界的性能。
也就是说,在具有平均开发系统的 OS X 系统上,我们看到了 100 次连续运行的平均样本结果(以纳秒为单位):
Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2
在多线程环境中,多个线程需要访问一个公共的 Map,ConcurrentHashMap 显然更可取。
但是,当 Map 只能由单个线程访问时,HashMap 可能是更好的选择,因为它的简单性和可靠的性能。
3.5.陷阱
检索操作通常不会在 ConcurrentHashMap 中阻塞,并且可能与更新操作重叠。因此,为了获得更好的性能,它们仅反映最近完成的更新操作的结果,如 官方 Javadoc。
还有其他几个事实需要牢记:
- 包括 size、isEmpty 和 containsValue 在内的聚合状态方法的结果通常仅在地图未在其他线程中进行并发更新时才有用:
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError()
throws InterruptedException {
Runnable collectMapSizes = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
mapSizes.add(concurrentMap.size());
}
};
Runnable updateMapData = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
concurrentMap.put(String.valueOf(i), i);
}
};
executorService.execute(updateMapData);
executorService.execute(collectMapSizes);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
assertEquals(MAX_SIZE, concurrentMap.size());
}
如果并发更新受到严格控制,聚合状态仍然是可靠的。
尽管这些聚合状态方法不能保证实时准确性,但它们可能足以用于监控或估计目的。
请注意,ConcurrentHashMap 的 size() 的用法应替换为 mappingCount(),因为后一种方法返回一个 long 计数,尽管它们在深处是基于相同的估计。
- hashCode 很重要:请注意,使用许多具有完全相同的 hashCode() 的键肯定会降低任何哈希表的性能。
为了改善键可比较时的影响,ConcurrentHashMap 可以使用键之间的比较顺序来帮助打破关系。不过,我们应该尽可能避免使用相同的 hashCode()。
- 迭代器仅设计用于单线程,因为它们提供弱一致性而不是快速失败遍历,并且它们永远不会抛出 ConcurrentModificationException。
- 默认初始表容量为16,并根据指定的并发级别进行调整:
public ConcurrentHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel) {
//...
if (initialCapacity < concurrencyLevel) {
initialCapacity = concurrencyLevel;
}
//...
}
- 重新映射函数的注意事项:尽管我们可以使用提供的compute 和merge* 方法进行重新映射操作,但我们应该保持它们快速、简短和简单,并专注于当前映射到避免意外阻塞。
- ConcurrentHashMap 中的键未按排序顺序排列,因此对于需要排序的情况,ConcurrentSkipListMap 是合适的选择。
4. ConcurrentNavigableMap
对于需要对键进行排序的情况,我们可以使用 ConcurrentSkipListMap,TreeMap 的并发版本。
作为 ConcurrentMap 的补充,ConcurrentNavigableMap 支持其Key的全排序(默认为升序),并且可以并发导航。为了并发兼容性,返回地图视图的方法被覆盖:
- subMap
- headMap
- tailMap
- subMap
- headMap
- tailMap
- descendingMap
keySet() 视图的迭代器和拆分器通过弱内存一致性得到增强:
- navigableKeySet
- keySet
- descendingKeySet
5.ConcurrentSkipListMap
之前,我们介绍了 NavigableMap 接口及其实现 TreeMap。 ConcurrentSkipListMap 可以看作是 TreeMap 的可扩展并发版本。
实际上,Java 中并没有红黑树的并发实现。 SkipLists 的并发变体在 ConcurrentSkipListMap 中实现,提供containsKey、get、put 和 remove 操作及其变体的预期平均 log(n) 时间成本。
除了 TreeMap 的特性外,键的插入、删除、更新和访问操作都由线程安全保证。以下是同时导航时与 TreeMap 的比较:
@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect()
throws InterruptedException {
NavigableMap<Integer, Integer> skipListMap
= new ConcurrentSkipListMap<>();
int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
assertEquals(10000 * 4, count);
}
@Test
public void givenTreeMap_whenNavConcurrently_thenCountError()
throws InterruptedException {
NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
assertNotEquals(10000 * 4, count);
}
private int countMapElementByPollingFirstEntry(
NavigableMap<Integer, Integer> navigableMap,
int elementCount,
int concurrencyLevel) throws InterruptedException {
for (int i = 0; i < elementCount * concurrencyLevel; i++) {
navigableMap.put(i, i);
}
AtomicInteger counter = new AtomicInteger(0);
ExecutorService executorService
= Executors.newFixedThreadPool(concurrencyLevel);
for (int j = 0; j < concurrencyLevel; j++) {
executorService.execute(() -> {
for (int i = 0; i < elementCount; i++) {
if (navigableMap.pollFirstEntry() != null) {
counter.incrementAndGet();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return counter.get();
}
对幕后性能问题的完整解释超出了本文的范围。详细信息可以在ConcurrentSkipListMap 的 Javadoc 中找到,它位于src.zip 文件中的java/util/concurrent 下。
六,结论
在本文中,我们主要介绍了 ConcurrentMap 接口和 ConcurrentHashMap 的特性,并介绍了 ConcurrentNavigableMap 需要按键排序。
可以在 在 GitHub 项目中找到。