@Test public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); this.region = initHRegion(b, getName(), CONF, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); LOG.info("First a batch put with all valid puts"); final Put[] puts = new Put[10]; for (int i = 0; i < 10; i++) { puts[i] = new Put(Bytes.toBytes("row_" + i)); puts[i].add(cf, qual, val); } OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>(); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { retFromThread.set(region.batchMutate(puts)); } }; LOG.info("...starting put thread while holding lock"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for put thread to sync first time"); long startWait = System.currentTimeMillis(); while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail("Timed out waiting for thread to sync first minibatch"); } } LOG.info("...releasing row lock, which should let put thread continue"); rowLock.release(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source); codes = retFromThread.get(); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } } finally { HRegion.closeHRegion(this.region); this.region = null; } }
@Test public void testBatchPut() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); byte[] qual = Bytes.toBytes("qual"); byte[] val = Bytes.toBytes("val"); this.region = initHRegion(b, getName(), conf, cf); MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); try { long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); LOG.info("First a batch put with all valid puts"); final Put[] puts = new Put[10]; for (int i = 0; i < 10; i++) { puts[i] = new Put(Bytes.toBytes("row_" + i)); puts[i].add(cf, qual, val); } OperationStatus[] codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); codes = this.region.batchMutate(puts); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2")); MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>(); TestThread putter = new TestThread(ctx) { @Override public void doWork() throws IOException { retFromThread.set(region.batchMutate(puts)); } }; LOG.info("...starting put thread while holding lock"); ctx.addThread(putter); ctx.startThreads(); LOG.info("...waiting for put thread to sync first time"); long startWait = System.currentTimeMillis(); while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail("Timed out waiting for thread to sync first minibatch"); } } LOG.info("...releasing row lock, which should let put thread continue"); rowLock.release(); LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source); codes = retFromThread.get(); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } } finally { HRegion.closeHRegion(this.region); this.region = null; } }