Java 类org.apache.hadoop.hbase.regionserver.HRegion.RowLock 实例源码

项目:HIndex    文件:TestHRegion.java   
@Test
public void testBatchPut() throws Exception {
  byte[] b = Bytes.toBytes(getName());
  byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
  byte[] qual = Bytes.toBytes("qual");
  byte[] val = Bytes.toBytes("val");
  this.region = initHRegion(b, getName(), CONF, cf);
  MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
  try {
    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);

    LOG.info("First a batch put with all valid puts");
    final Put[] puts = new Put[10];
    for (int i = 0; i < 10; i++) {
      puts[i] = new Put(Bytes.toBytes("row_" + i));
      puts[i].add(cf, qual, val);
    }

    OperationStatus[] codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
    }
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);

    LOG.info("Next a batch put with one invalid family");
    puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
    codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);

    LOG.info("Next a batch put that has to break into two batches to avoid a lock");
    RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));

    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
    TestThread putter = new TestThread(ctx) {
      @Override
      public void doWork() throws IOException {
        retFromThread.set(region.batchMutate(puts));
      }
    };
    LOG.info("...starting put thread while holding lock");
    ctx.addThread(putter);
    ctx.startThreads();

    LOG.info("...waiting for put thread to sync first time");
    long startWait = System.currentTimeMillis();
    while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
      Thread.sleep(100);
      if (System.currentTimeMillis() - startWait > 10000) {
        fail("Timed out waiting for thread to sync first minibatch");
      }
    }
    LOG.info("...releasing row lock, which should let put thread continue");
    rowLock.release();
    LOG.info("...joining on thread");
    ctx.stop();
    LOG.info("...checking that next batch was synced");
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
    codes = retFromThread.get();
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

  } finally {
    HRegion.closeHRegion(this.region);
    this.region = null;
  }
}
项目:PyroDB    文件:TestHRegion.java   
@Test
public void testBatchPut() throws Exception {
  byte[] b = Bytes.toBytes(getName());
  byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
  byte[] qual = Bytes.toBytes("qual");
  byte[] val = Bytes.toBytes("val");
  this.region = initHRegion(b, getName(), CONF, cf);
  MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
  try {
    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);

    LOG.info("First a batch put with all valid puts");
    final Put[] puts = new Put[10];
    for (int i = 0; i < 10; i++) {
      puts[i] = new Put(Bytes.toBytes("row_" + i));
      puts[i].add(cf, qual, val);
    }

    OperationStatus[] codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
    }
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);

    LOG.info("Next a batch put with one invalid family");
    puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
    codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);

    LOG.info("Next a batch put that has to break into two batches to avoid a lock");
    RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));

    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
    TestThread putter = new TestThread(ctx) {
      @Override
      public void doWork() throws IOException {
        retFromThread.set(region.batchMutate(puts));
      }
    };
    LOG.info("...starting put thread while holding lock");
    ctx.addThread(putter);
    ctx.startThreads();

    LOG.info("...waiting for put thread to sync first time");
    long startWait = System.currentTimeMillis();
    while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
      Thread.sleep(100);
      if (System.currentTimeMillis() - startWait > 10000) {
        fail("Timed out waiting for thread to sync first minibatch");
      }
    }
    LOG.info("...releasing row lock, which should let put thread continue");
    rowLock.release();
    LOG.info("...joining on thread");
    ctx.stop();
    LOG.info("...checking that next batch was synced");
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
    codes = retFromThread.get();
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

  } finally {
    HRegion.closeHRegion(this.region);
    this.region = null;
  }
}
项目:c5    文件:TestHRegion.java   
@Test
public void testBatchPut() throws Exception {
  byte[] b = Bytes.toBytes(getName());
  byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
  byte[] qual = Bytes.toBytes("qual");
  byte[] val = Bytes.toBytes("val");
  this.region = initHRegion(b, getName(), conf, cf);
  MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
  try {
    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);

    LOG.info("First a batch put with all valid puts");
    final Put[] puts = new Put[10];
    for (int i = 0; i < 10; i++) {
      puts[i] = new Put(Bytes.toBytes("row_" + i));
      puts[i].add(cf, qual, val);
    }

    OperationStatus[] codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
    }
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);

    LOG.info("Next a batch put with one invalid family");
    puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
    codes = this.region.batchMutate(puts);
    assertEquals(10, codes.length);
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);

    LOG.info("Next a batch put that has to break into two batches to avoid a lock");
    RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));

    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
    TestThread putter = new TestThread(ctx) {
      @Override
      public void doWork() throws IOException {
        retFromThread.set(region.batchMutate(puts));
      }
    };
    LOG.info("...starting put thread while holding lock");
    ctx.addThread(putter);
    ctx.startThreads();

    LOG.info("...waiting for put thread to sync first time");
    long startWait = System.currentTimeMillis();
    while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
      Thread.sleep(100);
      if (System.currentTimeMillis() - startWait > 10000) {
        fail("Timed out waiting for thread to sync first minibatch");
      }
    }
    LOG.info("...releasing row lock, which should let put thread continue");
    rowLock.release();
    LOG.info("...joining on thread");
    ctx.stop();
    LOG.info("...checking that next batch was synced");
    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
    codes = retFromThread.get();
    for (int i = 0; i < 10; i++) {
      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
          codes[i].getOperationStatusCode());
    }

  } finally {
    HRegion.closeHRegion(this.region);
    this.region = null;
  }
}