private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
private HTableDescriptor createHtd(boolean isStripe) throws Exception { HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); if (isStripe) { htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); if (initialStripeCount != null) { htd.setConfiguration( StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); htd.setConfiguration( HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); } else { htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500"); } if (splitSize != null) { htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); } if (splitParts != null) { htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); } } else { htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default } return htd; }
private long testCompactionWithoutThroughputLimit() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
private long testCompactionWithThroughputLimit() throws Exception { long throughputLimit = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { Store store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getHBaseAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } long duration = System.currentTimeMillis() - startTime; double throughput = (double) store.getStorefilesSize() / duration * 1000; // confirm that the speed limit work properly(not too fast, and also not too slow) // 20% is the max acceptable error rate. assertTrue(throughput < throughputLimit * 1.2); assertTrue(throughput > throughputLimit * 0.8); return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
@Override protected void initTable() throws IOException { // Do the same as the LoadTestTool does, but with different table configuration. HTableDescriptor htd = new HTableDescriptor(getTablename()); htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100"); HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY); HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd); }
@Override protected void initTable() throws IOException { // Do the same as the LoadTestTool does, but with different table configuration. HTableDescriptor htd = new HTableDescriptor(getTablename()); htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100"); HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.COLUMN_FAMILY); HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd); }
private long testCompactionWithThroughputLimit() throws Exception { long throughputLimit = 1024L * 1024; Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.setLong( PressureAwareCompactionThroughputController .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); conf.setLong( PressureAwareCompactionThroughputController .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); TEST_UTIL.startMiniCluster(1); try { HStore store = prepareData(); assertEquals(10, store.getStorefilesCount()); long startTime = System.currentTimeMillis(); TEST_UTIL.getAdmin().majorCompact(tableName); while (store.getStorefilesCount() != 1) { Thread.sleep(20); } long duration = System.currentTimeMillis() - startTime; double throughput = (double) store.getStorefilesSize() / duration * 1000; // confirm that the speed limit work properly(not too fast, and also not too slow) // 20% is the max acceptable error rate. assertTrue(throughput < throughputLimit * 1.2); assertTrue(throughput > throughputLimit * 0.8); return System.currentTimeMillis() - startTime; } finally { TEST_UTIL.shutdownMiniCluster(); } }
/** * Test the logic for striped store. */ @Test public void testFlushControlForStripedStore() throws Exception { hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); testFlushWithThroughputLimit(); }
@Override protected void initTable() throws IOException { // Do the same as the LoadTestTool does, but with different table configuration. HTableDescriptor htd = new HTableDescriptor(getTablename()); htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100"); HColumnDescriptor hcd = new HColumnDescriptor(HFileTestUtil.DEFAULT_COLUMN_FAMILY); HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd); }
/** * Test the tuning task of {@link PressureAwareCompactionThroughputController} */ @Test public void testThroughputTuning() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024); conf.setLong( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, PressureAwareCompactionThroughputController.class.getName()); conf.setInt( PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000); TEST_UTIL.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); try { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(family)); htd.setCompactionEnabled(false); TEST_UTIL.getHBaseAdmin().createTable(htd); TEST_UTIL.waitTableAvailable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); PressureAwareCompactionThroughputController throughputController = (PressureAwareCompactionThroughputController) regionServer.compactSplitThread .getCompactionThroughputController(); assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); Table table = conn.getTable(tableName); for (int i = 0; i < 5; i++) { table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); } Thread.sleep(2000); assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); Thread.sleep(2000); assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); Thread.sleep(2000); assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitCompactionThroughputController.class.getName()); regionServer.compactSplitThread.onConfigurationChange(conf); assertTrue(throughputController.isStopped()); assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitCompactionThroughputController); } finally { conn.close(); TEST_UTIL.shutdownMiniCluster(); } }
/** * Test the logic that we calculate compaction pressure for a striped store. */ @Test public void testGetCompactionPressureForStripedStore() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); TEST_UTIL.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); try { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(family)); htd.setCompactionEnabled(false); TEST_UTIL.getHBaseAdmin().createTable(htd); TEST_UTIL.waitTableAvailable(tableName); HStore store = (HStore) getStoreWithName(tableName); assertEquals(0, store.getStorefilesCount()); assertEquals(0.0, store.getCompactionPressure(), EPSILON); Table table = conn.getTable(tableName); for (int i = 0; i < 4; i++) { table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0])); table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); } assertEquals(8, store.getStorefilesCount()); assertEquals(0.0, store.getCompactionPressure(), EPSILON); table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0])); table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); assertEquals(10, store.getStorefilesCount()); assertEquals(0.5, store.getCompactionPressure(), EPSILON); table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0])); table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); assertEquals(12, store.getStorefilesCount()); assertEquals(1.0, store.getCompactionPressure(), EPSILON); table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0])); table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0])); TEST_UTIL.flush(tableName); assertEquals(14, store.getStorefilesCount()); assertEquals(2.0, store.getCompactionPressure(), EPSILON); } finally { conn.close(); TEST_UTIL.shutdownMiniCluster(); } }
/** * Test the tuning task of {@link PressureAwareFlushThroughputController} */ @Test public void testFlushThroughputTuning() throws Exception { Configuration conf = hbtu.getConfiguration(); setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 3000); hbtu.startMiniCluster(1); Connection conn = ConnectionFactory.createConnection(conf); hbtu.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) .build()); hbtu.waitTableAvailable(tableName); HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); PressureAwareFlushThroughputController throughputController = (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); for (HRegion region : regionServer.getRegions()) { region.flush(true); } assertEquals(0.0, regionServer.getFlushPressure(), EPSILON); Thread.sleep(5000); boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration()); if (tablesOnMaster) { // If no tables on the master, this math is off and I'm not sure what it is supposed to be // when meta is on the regionserver and not on the master. assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); } Table table = conn.getTable(tableName); Random rand = new Random(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { byte[] value = new byte[256 * 1024]; rand.nextBytes(value); table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } } Thread.sleep(5000); double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); regionServer.onConfigurationChange(conf); assertTrue(throughputController.isStopped()); assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); conn.close(); }