以下显示了用于简单并发HashMap的算法,对于写操作,它的可扩展性比更好。快速,复杂且相当复杂。而且,我需要vmlens(一种用于测试并发Java的工具)的类似数据结构。 java.util.concurrent.ConcurrentHashMapjava.util.concurrent.ConcurrentHashMap
java.util.concurrent.ConcurrentHashMapjava.util.concurrent.ConcurrentHashMap
我无法使用,因为我也想 使用此工具跟踪其内部。java.util.concurrent.ConcurrentHashMapjava.util.concurrent.ConcurrentHashMap
因此,我决定只实施最低限度的方法computeIfAbsent。这导致了一个简单但可扩展的并发HashMap。
computeIfAbsent
您可以在GitHub上从GitHub下载源代码。
算法 该算法使用带有线性探测的开放式寻址。它基于Jeff Preshing的工作,而Jeff Preshing也是基于Cliff Click博士的工作。该GitHub存储库包含来自Cliff Click博士的HashMap的源代码。
Jeff Preshing
开放式寻址意味着键值对存储在单个数组中。存储键值对的索引由哈希码给定数组大小。
线性探测意味着,如果此数组元素已被占用,我们将使用下一个数组元素。依此类推,直到找到一个空插槽。这是在两个填充的插槽之后插入的示例:
HashMap的实现图
为了使算法并发,我们需要使用填充空数组元素compareAndSet。通过使用compareAndSet,我们检查null和新元素的设置是否为atomic。如果compareAndSet成功,则我们已经成功插入了一个新元素。
如果失败,我们需要检查其他线程插入的密钥是否等于我们的密钥。如果是,则完成操作,并可以返回此元素的值。如果没有,我们需要找到一个新的空插槽,然后重试。
这是该算法的源代码:
private static final VarHandle ARRAY = MethodHandles.arrayElementVarHandle(KeyValue[].class); volatile KeyValue[] currentArray; public V computeIfAbsent(K key, Function<? super K, ? extends V> compute) { KeyValue[] local = currentArray; int hashCode = hashAndEquals.hashForKey(key); // the array position is given by hashCode modulo array size. Since // the array size is a power of two, we can use & instead of %. int index = (local.length - 1) & hashCode; int iterations = 0; KeyValue created = null; KeyValue current = tabAt(local, index); // fast path for reading if (current != null) { if (hashAndEquals.keyEquals(current.key, key)) { return (V) current.value; } else if (current.key == MOVED_NULL_KEY) { return (V) insertDuringResize(key, compute); } } while (true) { if (current == null) { if (created == null) { created = new KeyValue(key, compute.apply(key)); } // use compareAndSet to set the array element if it is null if (casTabAt(local, index, created)) { if (((iterations) << resizeAtPowerOfTwo) > local.length) { resize(local.length, iterations); } // if successful we have inserted a new value return (V) created.value; } // if not we need to check if the other key is the same // as our key current = tabAt(local, index); if (hashAndEquals.keyEquals(current.key, key)) { return (V) current.value; } else if (current.key == MOVED_NULL_KEY) { return (V) insertDuringResize(key, compute); } } index++; iterations++; if (index == local.length) { index = 0; } if ((iterations << resizeAtPowerOfTwo) > local.length) { resize(local.length, iterations); return computeIfAbsent(key, compute); } current = tabAt(local, index); if (current != null) { if (hashAndEquals.keyEquals(current.key, key)) { return (V) current.value; } else if (current.key == MOVED_NULL_KEY) { return (V) insertDuringResize(key, compute); } } } } private static final KeyValue tabAt(KeyValue[] tab, int i) { return (KeyValue) ARRAY.getVolatile(tab, i); } private static final boolean casTabAt(KeyValue[] tab, int i, KeyValue newValue) { return ARRAY.compareAndSet(tab, i, null, newValue); }
为什么起作用? 诀窍是,如果线程读取了已填充的数组元素,则该数组元素将保持填充状态,而键将保持不变。因此,唯一需要检查另一个线程是否修改了已读取值的时间是当我们读取null时。我们通过使用compareAndSet上面概述的方法来做到这一点。
因此,我们的算法是可行的,因为我们不允许删除键并且键是不可变的。
调整大小 在调整大小期间,我们需要确保新添加的元素不会丢失。因此,我们需要在调整大小期间阻止当前数组以进行更新。我们通过将每个空数组元素设置为一个特殊值来实现此目的MOVED_NULL_KEY。如果线程看到这样的值,则表明正在运行调整大小。并会等待插入,直到调整大小为止。
调整哈希图的大小包括以下步骤:
MOVED_NULL_KEY
这是源代码:
private static final Object MOVED_NULL_KEY = new Object(); private final Object resizeLock = new Object(); private void resize(int checkedLength, int intervall) { synchronized (resizeLock) { if (currentArray.length > checkedLength) { return; } resizeRunning = true; // Set all null values in the current array to the special value for (int i = 0; i < currentArray.length; i++) { if (tabAt(currentArray, i) == null) { casTabAt(currentArray, i, MOVED_KEY_VALUE); } } int arrayLength = Math.max(currentArray.length * 2, tableSizeFor(intervall * newMinLength + 2)); // Create a new array KeyValue[] newArray = new KeyValue[arrayLength]; // Copy all values from the current array to the new array for (int i = 0; i < currentArray.length; i++) { KeyValue current = tabAt(currentArray, i); if (current != MOVED_KEY_VALUE) { int hashCode = hashAndEquals.hashForKey(current.key); int index = (newArray.length - 1) & hashCode; while (newArray[index] != null) { index++; if (index == newArray.length) { index = 0; } } newArray[index] = current; } } // Set the current array to the new array currentArray = newArray; resizeRunning = false; resizeLock.notifyAll(); } }
Benchmark Results 不同哈希映射的性能取决于特定的工作负载和哈希函数的质量。所以,把下面的结果放在一边。为了度量性能,我使用JMH调用了computeifassent方法,该方法使用随机键。以下是基准方法的源代码:
public static final int MAX_KEY = 10000000; public static final Function COMPUTE = new Function() { public Object apply(Object t) { return new Object(); } }; @Benchmark public Object computeIfAbsentHashMap() { int key = ThreadLocalRandom.current().nextInt(MAX_KEY); return computeIfAbsentHashMap.computeIfAbsent(key, COMPUTE); }
您可以在GitHub上从基准代码下载源代码。以下是Intel Xeon Platinum 8124M CPU @ 3.00GHz的结果,该处理器具有两个插槽,每个插槽具有18个内核,每个内核具有两个硬件线程。我使用Amazon Corretto 11作为JVM。
地图的大小类似于的大小ConcurrentHashMap。对于500万个随机密钥, 需要285兆字节,而新映射需要253兆字节。java.util.concurrent.ConcurrentHashMap
性能差异 如果另一个线程更新了一个空插槽,则新映射将重试以插入元素。使用阵列盒作为同步块的监视器。因此,一次只能有一个线程可以修改这些bin的内容。java.util.concurrent.ConcurrentHashMap
这会导致以下行为差异:新映射可能会多次调用callback方法compute。每个失败的更新都会导致方法调用。另一方面,调用最多只计算一次。java.util.concurrent.ConcurrentHashMap
结论 通过仅实现computeIfAbsent方法,我们可以编写并发哈希映射,该映射的写比例要好于。该算法非常简单,因为我们避免了密钥的删除和更改。我使用此映射将类和相关的元数据存储在一起。java.util.concurrent.ConcurrentHashMap
原文链接:http://codingdict.com