private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { int refCount; while (iterator.hasNext()) { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); if (cache instanceof BucketCache) { refCount = ((BucketCache) cache).getRefCount(cacheKey); } else if (cache instanceof CombinedBlockCache) { refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); } else { continue; } assertEquals(0, refCount); } }
/** * Configurable constructor. Use this constructor if not using defaults. * @param maxSize maximum size of this cache, in bytes * @param blockSize expected average size of blocks, in bytes * @param evictionThread whether to run evictions in a bg thread or not * @param mapInitialSize initial size of backing ConcurrentHashMap * @param mapLoadFactor initial load factor of backing ConcurrentHashMap * @param mapConcurrencyLevel initial concurrency factor for backing CHM * @param minFactor percentage of total size that eviction will evict until * @param acceptableFactor percentage of total size that triggers eviction * @param singleFactor percentage of total size for single-access blocks * @param multiFactor percentage of total size for multiple-access blocks * @param memoryFactor percentage of total size for in-memory blocks */ public OnHeapBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) { if(singleFactor + multiFactor + memoryFactor != 1) { throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); } if(minFactor >= acceptableFactor) { throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); } if(minFactor >= 1.0f || acceptableFactor >= 1.0f) { throw new IllegalArgumentException("all factors must be < 1"); } this.maxSize = maxSize; this.blockSize = blockSize; map = new ConcurrentHashMap<BlockCacheKey,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); this.minFactor = minFactor; this.acceptableFactor = acceptableFactor; this.singleFactor = singleFactor; this.multiFactor = multiFactor; this.memoryFactor = memoryFactor; this.stats = new CacheStats(); this.count = new AtomicLong(0); this.elements = new AtomicLong(0); this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this.size = new AtomicLong(this.overhead); if(evictionThread) { this.evictionThread = new EvictionThread(this); this.evictionThread.start(); // FindBugs SC_START_IN_CTOR } else { this.evictionThread = null; } }
/** * Get the buffer of the block with the specified name. * * @param cacheKey block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) * @return buffer of specified cache key, or null if not in cache * {@see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType)} */ @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { CachedBlock cb = map.get(cacheKey); if(cb == null) { if (!repeat) stats.miss(caching); return null; } stats.hit(caching); cb.access(count.incrementAndGet()); return cb.getBuffer(); }
@Override public boolean evictBlock(BlockCacheKey cacheKey) { CachedBlock cb = map.get(cacheKey); if (cb == null) return false; evictBlock(cb); return true; }
/** * Evict block. * * @param block the block * @return the long */ protected long evictBlock(CachedBlock block) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); elements.decrementAndGet(); stats.evicted(); return block.heapSize(); }
/** * Free. * * @param toFree the to free * @return the long */ public long free(long toFree) { CachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { freedBytes += evictBlock(cb); if (freedBytes >= toFree) { return freedBytes; } } return freedBytes; }
@Override public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) throws IOException { Map<String, Path> sfMap = FSUtils.getTableStoreFilePathMap( FileSystem.get(conf), FSUtils.getRootDir(conf)); // quirky, but it's a compound key and this is a shortcut taken instead of // creating a class that would represent only a key. Map<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary> bcs = new HashMap<BlockCacheColumnFamilySummary, BlockCacheColumnFamilySummary>(); for (CachedBlock cb : map.values()) { String sf = cb.getCacheKey().getHfileName(); Path path = sfMap.get(sf); if ( path != null) { BlockCacheColumnFamilySummary lookup = BlockCacheColumnFamilySummary.createFromStoreFilePath(path); BlockCacheColumnFamilySummary bcse = bcs.get(lookup); if (bcse == null) { bcse = BlockCacheColumnFamilySummary.create(lookup); bcs.put(lookup,bcse); } bcse.incrementBlocks(); bcse.incrementHeapSize(cb.heapSize()); } } List<BlockCacheColumnFamilySummary> list = new ArrayList<BlockCacheColumnFamilySummary>(bcs.values()); Collections.sort( list ); return list; }
@Override public Iterator<CachedBlock> iterator() { return null; }
private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero) throws InterruptedException { int counter = NO_OF_THREADS; if (CustomInnerRegionObserver.waitForGets.get()) { // Because only one row is selected, it has only 2 blocks counter = counter - 1; while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { Thread.sleep(100); } } else { while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { Thread.sleep(100); } } Iterator<CachedBlock> iterator = cache.iterator(); int refCount = 0; while (iterator.hasNext()) { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); if (cache instanceof BucketCache) { refCount = ((BucketCache) cache).getRefCount(cacheKey); } else if (cache instanceof CombinedBlockCache) { refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); } else { continue; } System.out.println(" the refcount is " + refCount + " block is " + cacheKey); if (CustomInnerRegionObserver.waitForGets.get()) { if (expectOnlyZero) { assertTrue(refCount == 0); } if (refCount != 0) { // Because the scan would have also touched up on these blocks but // it // would have touched // all 3 if (getClosed) { // If get has closed only the scan's blocks would be available assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); } else { assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); } } } else { // Because the get would have also touched up on these blocks but it // would have touched // upon only 2 additionally if (expectOnlyZero) { assertTrue(refCount == 0); } if (refCount != 0) { if (getLatch == null) { assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); } else { assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); } } } } CustomInnerRegionObserver.getCdl().get().countDown(); }
/** * Helper function that updates the local size counter and also updates any * per-cf or per-blocktype metrics it can discern from given. * * @param cb the cb * @param evict the evict * @return the long * {@link CachedBlock} */ protected long updateSizeMetrics(CachedBlock cb, boolean evict) { long heapsize = cb.heapSize(); if (evict) { heapsize *= -1; } return size.addAndGet(heapsize); }
/** * Adds the. * * @param block the block */ public void add(CachedBlock block) { totalSize += block.heapSize(); queue.add(block); }