private CompactJobQueue.CompactJob createCompactJob(final CompactionRequest request, final Path writtenPath, HStore store) throws IOException { // check reference file, not supported yet! boolean needToRebuild = false; for (StoreFile sf : request.getFiles()) { if (sf.getPath().getName().indexOf(".") != -1 || sf.isReference()) { needToRebuild = true; break; } } CompactJobQueue.CompactJob job; if (needToRebuild) { job = new CompactJobQueue.RebuildCompactJob(store, request, writtenPath); } else { job = new CompactJobQueue.NormalCompactJob(store, request, writtenPath); } return job; }
public LMDIndexWriter(HStore store, Path rawDataPath, TimeRangeTracker timeRangeTracker, String opType) { tableRelation = store.indexTableRelation; this.rawDataPath = rawDataPath; this.store = store; tracker = timeRangeTracker; this.opType = opType; lmdIndexParameters = store.getLMDIndexParameters(); int size = 0; for (Map.Entry<byte[], TreeSet<byte[]>> entry : tableRelation.getIndexFamilyMap().entrySet()) { size += entry.getValue().size(); } dimensions = size; int[] mins = new int[dimensions]; Arrays.fill(mins, 0); }
/** * Do a small get/scan against one store. This is required because store * has no actual methods of querying itself, and relies on StoreScanner. */ public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName()), // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set // readpoint 0. 0); List<Cell> result = new ArrayList<Cell>(); scanner.next(result); if (!result.isEmpty()) { // verify that we are on the row we want: Cell kv = result.get(0); if (!CellUtil.matchingRow(kv, get.getRow())) { result.clear(); } } scanner.close(); return result; }
/** * @param services Master services instance. * @param htd * @param parent * @param daughter * @param midkey * @param top True if we are to write a 'top' reference. * @return Path to reference we created. * @throws IOException */ private Path createReferences(final MasterServices services, final HTableDescriptor htd, final HRegionInfo parent, final HRegionInfo daughter, final byte [] midkey, final boolean top) throws IOException { Path rootdir = services.getMasterFileSystem().getRootDir(); Path tabledir = FSUtils.getTableDir(rootdir, parent.getTable()); Path storedir = HStore.getStoreHomedir(tabledir, daughter, htd.getColumnFamilies()[0].getName()); Reference ref = top? Reference.createTopReference(midkey): Reference.createBottomReference(midkey); long now = System.currentTimeMillis(); // Reference name has this format: StoreFile#REF_NAME_PARSER Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName()); FileSystem fs = services.getMasterFileSystem().getFileSystem(); ref.write(fs, p); return p; }
@Override public KeyValueScanner preStoreScannerOpen( final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException { TableName tn = store.getTableName(); if (!tn.isSystemTable()) { Long newTtl = ttls.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); } else { return s; } }
@Test public void testPurgeExpiredFiles() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() > 1) { Thread.sleep(100); } assertTrue(store.getStorefilesCount() == 1); } finally { TEST_UTIL.shutdownMiniCluster(); } }
private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
private HTableDescriptor createHtd(boolean isStripe) throws Exception { HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); if (isStripe) { htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); if (initialStripeCount != null) { htd.setConfiguration( StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); htd.setConfiguration( HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); } else { htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500"); } if (splitSize != null) { htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); } if (splitParts != null) { htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); } } else { htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default } return htd; }
/** * Do a small get/scan against one store. This is required because store * has no actual methods of querying itself, and relies on StoreScanner. */ public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName()), // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set // readpoint 0. 0); List<Cell> result = new ArrayList<Cell>(); scanner.next(result); if (!result.isEmpty()) { // verify that we are on the row we want: Cell kv = result.get(0); if (!CellUtil.matchingRow(kv, get.getRow())) { result.clear(); } } scanner.close(); return result; }
@Override public KeyValueScanner preStoreScannerOpen( final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException { TableName tn = store.getTableName(); if (!tn.isSystemTable()) { Long newTtl = ttls.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); } else { return s; } }
/** * Gets the number of del cell in the del files * @param paths the del file paths * @return the cell size */ private int countDelCellsInDelFiles(List<Path> paths) throws IOException { List<HStoreFile> sfs = new ArrayList<>(); int size = 0; for (Path path : paths) { HStoreFile sf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); sfs.add(sf); } List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false, HConstants.LATEST_TIMESTAMP)); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long ttl = HStore.determineTTLFromFamily(hcd); ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparatorImpl.COMPARATOR); StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners); List<Cell> results = new ArrayList<>(); boolean hasMore = true; while (hasMore) { hasMore = scanner.next(results); size += results.size(); results.clear(); } scanner.close(); return size; }
@Override protected List<KeyValueScanner> selectScannersFrom(HStore store, List<? extends KeyValueScanner> allScanners) { List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); for (KeyValueScanner scanner : scanners) { newScanners.add(new DelegatingKeyValueScanner(scanner) { @Override public boolean reseek(Cell key) throws IOException { if (ON.get()) { REQ_COUNT.incrementAndGet(); if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { if (IS_DO_NOT_RETRY.get()) { throw new DoNotRetryIOException("Injected exception"); } else { throw new IOException("Injected exception"); } } } return super.reseek(key); } }); } return newScanners; }
/** * Do a small get/scan against one store. This is required because store * has no actual methods of querying itself, and relies on StoreScanner. */ public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set // readpoint 0. 0); List<Cell> result = new ArrayList<>(); scanner.next(result); if (!result.isEmpty()) { // verify that we are on the row we want: Cell kv = result.get(0); if (!CellUtil.matchingRows(kv, get.getRow())) { result.clear(); } } scanner.close(); return result; }
private Path createReferences(final MasterServices services, final TableDescriptor td, final HRegionInfo parent, final HRegionInfo daughter, final byte [] midkey, final boolean top) throws IOException { Path rootdir = services.getMasterFileSystem().getRootDir(); Path tabledir = FSUtils.getTableDir(rootdir, parent.getTable()); Path storedir = HStore.getStoreHomedir(tabledir, daughter, td.getColumnFamilies()[0].getName()); Reference ref = top? Reference.createTopReference(midkey): Reference.createBottomReference(midkey); long now = System.currentTimeMillis(); // Reference name has this format: StoreFile#REF_NAME_PARSER Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName()); FileSystem fs = services.getMasterFileSystem().getFileSystem(); ref.write(fs, p); return p; }
private HStore prepareData() throws IOException { Admin admin = TEST_UTIL.getAdmin(); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build()) .build(); admin.createTable(desc); Table table = TEST_UTIL.getConnection().getTable(tableName); TimeOffsetEnvironmentEdge edge = (TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[128 * 1024]; ThreadLocalRandom.current().nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } admin.flush(tableName); edge.increment(1001); } return getStoreWithName(tableName); }
@Test public void testPurgeExpiredFiles() throws Exception { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); TEST_UTIL.getAdmin().majorCompact(tableName); TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { @Override public boolean evaluate() throws Exception { return store.getStorefilesCount() == 1; } @Override public String explainFailure() throws Exception { return "The store file count " + store.getStorefilesCount() + " is still greater than 1"; } }); }
@Test public void testSanityCheckBlockingStoreFiles() throws IOException { error.expect(DoNotRetryIOException.class); error.expectMessage("Blocking file count 'hbase.hstore.blockingStoreFiles'"); error.expectMessage("is below recommended minimum of 1000 for column family"); TableName tableName = TableName.valueOf(getClass().getSimpleName() + "-BlockingStoreFiles"); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, FIFOCompactionPolicy.class.getName()) .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, DisabledRegionSplitPolicy.class.getName()) .setValue(HStore.BLOCKING_STOREFILES_KEY, "10") .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build()) .build(); TEST_UTIL.getAdmin().createTable(desc); }
private HStore prepareData() throws IOException { Admin admin = TEST_UTIL.getAdmin(); if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } Table table = TEST_UTIL.createTable(tableName, family); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[128 * 1024]; ThreadLocalRandom.current().nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } admin.flush(tableName); } return getStoreWithName(tableName); }
private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
/** * Writes Puts to the table and flushes few times. * @return {@link Pair} of (throughput, duration). */ private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { // Internally, throughput is controlled after every cell write, so keep value size less for // better control. final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; Random rand = new Random(); long duration = 0; for (int i = 0; i < NUM_FLUSHES; i++) { // Write about 10M (10 times of throughput rate) per iteration. for (int j = 0; j < NUM_PUTS; j++) { byte[] value = new byte[VALUE_SIZE]; rand.nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); hbtu.getAdmin().flush(tableName); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName); assertEquals(NUM_FLUSHES, store.getStorefilesCount()); double throughput = (double)store.getStorefilesSize() / TimeUnit.NANOSECONDS.toSeconds(duration); return new Pair<>(throughput, duration); }
@Override public List<HStore> getStores() { List<HStore> list = new ArrayList<>(stores.size()); /** * This is used to trigger the custom definition (faulty) * of refresh HFiles API. */ try { if (this.store == null) { store = new HStoreWithFaultyRefreshHFilesAPI(this, ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf); } list.add(store); } catch (IOException ioe) { LOG.info("Couldn't instantiate custom store implementation", ioe); } list.addAll(stores.values()); return list; }
/** * Do a small get/scan against one store. This is required because store * has no actual methods of querying itself, and relies on StoreScanner. */ public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); List<Cell> result = new ArrayList<Cell>(); scanner.next(result); if (!result.isEmpty()) { // verify that we are on the row we want: Cell kv = result.get(0); if (!CellUtil.matchingRow(kv, get.getRow())) { result.clear(); } } scanner.close(); return result; }
public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) { Preconditions.checkNotNull(r); Preconditions.checkNotNull(files); this.r = r; this.s = s; this.compactSelection = files; long sz = 0; for (StoreFile sf : files.getFilesToCompact()) { sz += sf.getReader().length(); } this.totalSize = sz; this.isMajor = isMajor; this.p = p; this.timeInNanos = System.nanoTime(); }
/** * Do a small get/scan against one store. This is required because store * has no actual methods of querying itself, and relies on StoreScanner. */ public static List<KeyValue> getFromStoreFile(HStore store, Get get) throws IOException { MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); List<KeyValue> result = new ArrayList<KeyValue>(); scanner.next(result); if (!result.isEmpty()) { // verify that we are on the row we want: KeyValue kv = result.get(0); if (!Bytes.equals(kv.getRow(), get.getRow())) { result.clear(); } } scanner.close(); return result; }
/** * @param services Master services instance. * @param htd * @param parent * @param daughter * @param midkey * @param top True if we are to write a 'top' reference. * @return Path to reference we created. * @throws IOException */ private Path createReferences(final MasterServices services, final HTableDescriptor htd, final HRegionInfo parent, final HRegionInfo daughter, final byte [] midkey, final boolean top) throws IOException { Path rootdir = services.getMasterFileSystem().getRootDir(); Path tabledir = HTableDescriptor.getTableDir(rootdir, parent.getTableName()); Path storedir = HStore.getStoreHomedir(tabledir, daughter.getEncodedName(), htd.getColumnFamilies()[0].getName()); Reference ref = top? Reference.createTopReference(midkey): Reference.createBottomReference(midkey); long now = System.currentTimeMillis(); // Reference name has this format: StoreFile#REF_NAME_PARSER Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName()); FileSystem fs = services.getMasterFileSystem().getFileSystem(); ref.write(fs, p); return p; }
@Override public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { Long newTtl = ttls.get(store.getTableName()); if (newTtl != null) { System.out.println("PreFlush:" + newTtl); } Integer newVersions = versions.get(store.getTableName()); HStore.ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); HStore.ScanInfo scanInfo = new HStore.ScanInfo(family.getName(), family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); }
@Override public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException { Long newTtl = ttls.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName()); HStore.ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); HStore.ScanInfo scanInfo = new HStore.ScanInfo(family.getName(), family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() .getSmallestReadPoint(), earliestPutTs); }
/** * Get the directory to archive a store directory * @param conf {@link Configuration} to read for the archive directory name. * @param region parent region information under which the store currently lives * @param tabledir directory for the table under which the store currently lives * @param family name of the family in the store * @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should * not be archived */ public static Path getStoreArchivePath(Configuration conf, HRegionInfo region, Path tabledir, byte[] family) throws IOException { TableName tableName = FSUtils.getTableName(tabledir); Path rootDir = FSUtils.getRootDir(conf); Path tableArchiveDir = getTableArchivePath(rootDir, tableName); return HStore.getStoreHomedir(tableArchiveDir, region, family); }
/** * Do a minor/major compaction for store files' index. * * @param compactedFile if null, compact index from this file, else compact each StoreFile's index * together * @return Product of compaction or null if there is no index column cell * @throws IOException */ void compactLMDIndex(final Path compactedFile, HStore store, TimeRangeTracker timeRangeTracker) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug( "Generate intermediate index file from major compaction file=" + compactedFile + " in cf=" + store.toString()); } HFile.Reader reader = HFile.createReader(store.fs.getFileSystem(), compactedFile, store.cacheConf, conf); HFileScanner scanner = reader.getScanner(false, false, true); Queue<KeyValue> rawRecords = new LinkedList<>(); int counter = 0; try { scanner.seekTo(); do { KeyValue kv = (KeyValue) scanner.getKeyValue(); if (store.indexTableRelation.isIndexColumn(kv.getFamily(), kv.getQualifier())) { rawRecords.add(kv); } ++counter; } while (scanner.next()); } finally { if (reader != null) reader.close(); } System.out.println("in compacted file=" + compactedFile + ", number of keyvalue=" + counter + ", for LMDIndex is:" + rawRecords.size()); LMDIndexWriter lmdIndexWriter = new LMDIndexWriter(store, compactedFile, timeRangeTracker, "COMPACT"); lmdIndexWriter.processKeyValueQueue(rawRecords); }
public LCIndexWriter(HStore store, Path hdfsTmpPath, TimeRangeTracker tracker) throws IOException { this.store = store; this.hdfsTmpPath = hdfsTmpPath; this.tableRelation = store.indexTableRelation; indexParameters = store.getLCIndexParameters(); this.statMap = LCStatInfo2.parseStatString(tableRelation, store.getHRegion().getTableDesc().getValue(LCIndexConstant.LC_TABLE_DESC_RANGE_STR)); this.tracker = tracker; Path dirPath = indexParameters.getTmpDirPath(hdfsTmpPath); if (indexParameters.getLCIndexFileSystem().exists(dirPath)) { indexParameters.getLCIndexFileSystem().mkdirs(dirPath); } }