/** * @param maxSize the target size of elements in the queue * @param blockSize expected average size of blocks */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); if (initialSize == 0) { initialSize++; } queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { public int compare(Entry<BlockCacheKey, BucketEntry> entry1, Entry<BlockCacheKey, BucketEntry> entry2) { return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue()); } }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; }
/** * Attempt to add the specified entry to this queue. * <p> * If the queue is smaller than the max size, or if the specified element is * ordered after the smallest element in the queue, the element will be added * to the queue. Otherwise, there is no side effect of this call. * @param entry a bucket entry with key to try to add to the queue */ public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { if (cacheSize < maxSize) { queue.add(entry); cacheSize += entry.getValue().getLength(); } else { BucketEntry head = queue.peek().getValue(); if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) { cacheSize += entry.getValue().getLength(); cacheSize -= head.getLength(); if (cacheSize > maxSize) { queue.poll(); } else { cacheSize += head.getLength(); } queue.add(entry); } } }
/** * Evicts all blocks for a specific HFile. * <p> * This is used for evict-on-close to remove all blocks of a specific HFile. * * @return the number of blocks evicted */ @Override public int evictBlocksByHfileName(String hfileName) { // Copy the list to avoid ConcurrentModificationException // as evictBlockKey removes the key from the index Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName); if (keySet == null) { return 0; } int numEvicted = 0; List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet); for (BlockCacheKey key : keysForHFile) { if (evictBlock(key)) { ++numEvicted; } } return numEvicted; }
/** * Set up variables and get BucketCache and WriterThread into state where tests can manually * control the running of WriterThread and BucketCache is empty. * @throws Exception */ @Before public void setUp() throws Exception { // Arbitrary capacity. final int capacity = 16; // Run with one writer thread only. Means there will be one writer queue only too. We depend // on this in below. final int writerThreadsCount = 1; this.bc = new MockBucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount, capacity, null, 100/*Tolerate ioerrors for 100ms*/); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. this.wt = bc.writerThreads[0]; this.q = bc.writerQueues.get(0); wt.disableWriter(); this.plainKey = new BlockCacheKey("f", 0); this.plainCacheable = Mockito.mock(Cacheable.class); assertThat(bc.ramCache.isEmpty(), is(true)); assertTrue(q.isEmpty()); }
@Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { CacheablePair contentBlock = backingMap.get(key); if (contentBlock == null) { if (!repeat) stats.miss(caching); return null; } stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. try { contentBlock.recentlyAccessed.set(System.nanoTime()); synchronized (contentBlock) { if (contentBlock.serializedData == null) { // concurrently evicted LOG.warn("Concurrent eviction of " + key); return null; } return contentBlock.deserializer .deserialize(contentBlock.serializedData.asReadOnlyBuffer()); } } catch (Throwable t) { LOG.error("Deserializer threw an exception. This may indicate a bug.", t); return null; } }
/** * Cache the block with the specified key and buffer. First finds what size * SingleSlabCache it should fit in. If the block doesn't fit in any, it will * return without doing anything. * <p> * It is assumed this will NEVER be called on an already cached block. If that * is done, it is assumed that you are reinserting the same exact block due to * a race condition, and will throw a runtime exception. * * @param cacheKey block cache key * @param cachedItem block buffer */ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) { Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem .getSerializedLength()); this.requestStats.addin(cachedItem.getSerializedLength()); if (scacheEntry == null) { return; // we can't cache, something too big. } this.successfullyCachedStats.addin(cachedItem.getSerializedLength()); SingleSizeCache scache = scacheEntry.getValue(); /* * This will throw a runtime exception if we try to cache the same value * twice */ scache.cacheBlock(cacheKey, cachedItem); }
/** * Get the buffer of the block with the specified name. * @param caching * @param key * @param repeat * * @return buffer of specified block name, or null if not in cache */ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { if (!repeat) stats.miss(caching); return null; } Cacheable contentBlock = cachedBlock.getBlock(key, caching, false); if (contentBlock != null) { stats.hit(caching); } else { if (!repeat) stats.miss(caching); } return contentBlock; }
/** * @param maxSize the target size of elements in the queue * @param blockSize expected average size of blocks */ public CachedEntryQueue(long maxSize, long blockSize) { int initialSize = (int) (maxSize / blockSize); if (initialSize == 0) initialSize++; queue = MinMaxPriorityQueue .orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() { public int compare(Entry<BlockCacheKey, BucketEntry> entry1, Entry<BlockCacheKey, BucketEntry> entry2) { return entry1.getValue().compareTo(entry2.getValue()); } }).expectedSize(initialSize).create(); cacheSize = 0; this.maxSize = maxSize; }
/** * Attempt to add the specified entry to this queue. * * <p> * If the queue is smaller than the max size, or if the specified element is * ordered after the smallest element in the queue, the element will be added * to the queue. Otherwise, there is no side effect of this call. * @param entry a bucket entry with key to try to add to the queue */ public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) { if (cacheSize < maxSize) { queue.add(entry); cacheSize += entry.getValue().getLength(); } else { BucketEntry head = queue.peek().getValue(); if (entry.getValue().compareTo(head) > 0) { cacheSize += entry.getValue().getLength(); cacheSize -= head.getLength(); if (cacheSize > maxSize) { queue.poll(); } else { cacheSize += head.getLength(); } queue.add(entry); } } }
/** * Get the buffer of the block with the specified name. * * @return buffer of specified block name, or null if not in cache */ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { if (!repeat) stats.miss(caching); return null; } Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics); if (contentBlock != null) { if (updateCacheMetrics) stats.hit(caching); } else if (!repeat) { if (updateCacheMetrics) stats.miss(caching); } return contentBlock; }
/** * Evicts all blocks for a specific HFile. * <p> * This is used for evict-on-close to remove all blocks of a specific HFile. * * @return the number of blocks evicted */ @Override public int evictBlocksByHfileName(String hfileName) { Set<BlockCacheKey> keySet = blocksByHFile.subSet( new BlockCacheKey(hfileName, Long.MIN_VALUE), true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); int numEvicted = 0; for (BlockCacheKey key : keySet) { if (evictBlock(key)) { ++numEvicted; } } return numEvicted; }
/** * Set up variables and get BucketCache and WriterThread into state where tests can manually * control the running of WriterThread and BucketCache is empty. * @throws Exception */ @Before public void setUp() throws Exception { // Arbitrary capacity. final int capacity = 16; // Run with one writer thread only. Means there will be one writer queue only too. We depend // on this in below. final int writerThreadsCount = 1; this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount, capacity, null, 100/*Tolerate ioerrors for 100ms*/); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. this.wt = bc.writerThreads[0]; this.q = bc.writerQueues.get(0); wt.disableWriter(); this.plainKey = new BlockCacheKey("f", 0); this.plainCacheable = Mockito.mock(Cacheable.class); assertThat(bc.ramCache.isEmpty(), is(true)); assertTrue(q.isEmpty()); }
@Override public Cacheable getBlock(BlockCacheKey key, boolean caching) { CacheablePair contentBlock = backingMap.get(key); if (contentBlock == null) { stats.miss(caching); return null; } stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. try { contentBlock.recentlyAccessed.set(System.nanoTime()); synchronized (contentBlock) { if (contentBlock.serializedData == null) { // concurrently evicted LOG.warn("Concurrent eviction of " + key); return null; } return contentBlock.deserializer .deserialize(contentBlock.serializedData.asReadOnlyBuffer()); } } catch (Throwable t) { LOG.error("Deserializer threw an exception. This may indicate a bug.", t); return null; } }
/** * The main method. * * @param args the arguments */ public static void main(String[] args) { backingMap = new ConcurrentHashMap<BlockCacheKey,BucketEntry>(); int max = 10000000; System.out.println("Bucket Cache on-heap overhead per entry calculation"); Random r = new Random(); for(int i= 0; i < max; i++){ BlockCacheKey key = new BlockCacheKey(new String("/hbase/TMO_MAY-UPLOADS/f493283255245c912571355f92b328dd/f/ccbd33f3b8a6452aa6cb64ea9a39f520"),r.nextLong()); BucketEntry entry = new BucketEntry(10, 10, System.currentTimeMillis(), r.nextBoolean()); backingMap.put(key, entry); if(i % 100000 == 0){ System.out.println("Stored " + backingMap.size()+" objects."); } } long start = System.currentTimeMillis(); System.out.println("Free space starts: "); freeSpace(); System.out.println("Time = "+(System.currentTimeMillis() - start)+" ms"); }
@Override public int compare(BlockCacheKey a, BlockCacheKey b) { if (a.getOffset() == b.getOffset()) { return 0; } else if (a.getOffset() < b.getOffset()) { return -1; } return 1; }
/** * Cache the block to ramCache * @param cacheKey block's cache key * @param cachedItem block buffer * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { if (!cacheEnabled) { return; } if (backingMap.containsKey(cacheKey)) { return; } /* * Stuff the entry into the RAM cache so it can get drained to the persistent store */ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); if (ramCache.putIfAbsent(cacheKey, re) != null) { return; } int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); boolean successfulAddition = false; if (wait) { try { successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { successfulAddition = bq.offer(re); } if (!successfulAddition) { ramCache.remove(cacheKey); cacheStats.failInsert(); } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); blocksByHFile.put(cacheKey.getHfileName(), cacheKey); } }
@VisibleForTesting void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { bucketAllocator.freeBlock(bucketEntry.offset()); realCacheSize.addAndGet(-1 * bucketEntry.getLength()); blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); if (decrementBlockNumber) { this.blockNumber.decrementAndGet(); } }
@Override public boolean evictBlock(BlockCacheKey cacheKey) { if (!cacheEnabled) { return false; } RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { this.blockNumber.decrementAndGet(); this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); } BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry == null) { if (removedBlock != null) { cacheStats.evicted(0, cacheKey.isPrimary()); return true; } else { return false; } } ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { lock.writeLock().lock(); if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { return false; } } finally { lock.writeLock().unlock(); } cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); return true; }
public long free(long toFree) { Map.Entry<BlockCacheKey, BucketEntry> entry; long freedBytes = 0; while ((entry = queue.pollLast()) != null) { evictBlock(entry.getKey()); freedBytes += entry.getValue().getLength(); if (freedBytes >= toFree) { return freedBytes; } } return freedBytes; }
public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { this.key = bck; this.data = data; this.accessCounter = accessCounter; this.inMemory = inMemory; }
@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean cacheDataInL1) { if (super.getBlock(cacheKey, true, false, true) != null) { throw new RuntimeException("Cached an already cached block"); } super.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); }
@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { if (super.getBlock(cacheKey, true, false, true) != null) { throw new RuntimeException("Cached an already cached block"); } super.cacheBlock(cacheKey, buf); }