Java 类org.apache.hadoop.hbase.client.RegionServerCallable 实例源码

项目:pbase    文件:RegionCoprocessorRpcChannel.java   
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
                                Message request, Message responsePrototype)
    throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Call: "+method.getName()+", "+request.toString());
  }

  if (row == null) {
    throw new IllegalArgumentException("Missing row property for remote region location");
  }

  final ClientProtos.CoprocessorServiceCall call =
      ClientProtos.CoprocessorServiceCall.newBuilder()
          .setRow(ByteStringer.wrap(row))
          .setServiceName(method.getService().getFullName())
          .setMethodName(method.getName())
          .setRequest(request.toByteString()).build();
  RegionServerCallable<CoprocessorServiceResponse> callable =
      new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
        public CoprocessorServiceResponse call(int callTimeout) throws Exception {
          byte[] regionName = getLocation().getRegionInfo().getRegionName();
          return ProtobufUtil.execService(getStub(), call, regionName);
        }
      };
  CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
      .callWithRetries(callable, operationTimeout);
  Message response = null;
  if (result.getValue().hasValue()) {
    response = responsePrototype.newBuilderForType()
        .mergeFrom(result.getValue().getValue()).build();
  } else {
    response = responsePrototype.getDefaultInstanceForType();
  }
  lastRegion = result.getRegion().getValue().toByteArray();
  if (LOG.isTraceEnabled()) {
    LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
  }
  return response;
}
项目:PyroDB    文件:RegionCoprocessorRpcChannel.java   
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
                                Message request, Message responsePrototype)
    throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Call: "+method.getName()+", "+request.toString());
  }

  if (row == null) {
    throw new IllegalArgumentException("Missing row property for remote region location");
  }

  final ClientProtos.CoprocessorServiceCall call =
      ClientProtos.CoprocessorServiceCall.newBuilder()
          .setRow(HBaseZeroCopyByteString.wrap(row))
          .setServiceName(method.getService().getFullName())
          .setMethodName(method.getName())
          .setRequest(request.toByteString()).build();
  RegionServerCallable<CoprocessorServiceResponse> callable =
      new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
        public CoprocessorServiceResponse call(int callTimeout) throws Exception {
          byte[] regionName = getLocation().getRegionInfo().getRegionName();
          return ProtobufUtil.execService(getStub(), call, regionName);
        }
      };
  CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
      .callWithRetries(callable, operationTimeout);
  Message response = null;
  if (result.getValue().hasValue()) {
    response = responsePrototype.newBuilderForType()
        .mergeFrom(result.getValue().getValue()).build();
  } else {
    response = responsePrototype.getDefaultInstanceForType();
  }
  lastRegion = result.getRegion().getValue().toByteArray();
  if (LOG.isTraceEnabled()) {
    LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
  }
  return response;
}
项目:c5    文件:RegionCoprocessorRpcChannel.java   
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
                                Message request, Message responsePrototype)
    throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Call: "+method.getName()+", "+request.toString());
  }

  if (row == null) {
    throw new IllegalArgumentException("Missing row property for remote region location");
  }

  final ClientProtos.CoprocessorServiceCall call =
      ClientProtos.CoprocessorServiceCall.newBuilder()
          .setRow(ZeroCopyLiteralByteString.wrap(row))
          .setServiceName(method.getService().getFullName())
          .setMethodName(method.getName())
          .setRequest(request.toByteString()).build();
  RegionServerCallable<CoprocessorServiceResponse> callable =
      new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
        public CoprocessorServiceResponse call() throws Exception {
          byte[] regionName = getLocation().getRegionInfo().getRegionName();
          return ProtobufUtil.execService(getStub(), call, regionName);
        }
      };
  CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
      .callWithRetries(callable);
  Message response = null;
  if (result.getValue().hasValue()) {
    response = responsePrototype.newBuilderForType()
        .mergeFrom(result.getValue().getValue()).build();
  } else {
    response = responsePrototype.getDefaultInstanceForType();
  }
  lastRegion = result.getRegion().getValue().toByteArray();
  if (LOG.isTraceEnabled()) {
    LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
  }
  return response;
}
项目:ditb    文件:TestHRegionServerBulkLoad.java   
public void doAnAction() throws Exception {
  long iteration = numBulkLoads.getAndIncrement();
  Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
      iteration));

  // create HFiles for different column families
  FileSystem fs = UTIL.getTestFileSystem();
  byte[] val = Bytes.toBytes(String.format("%010d", iteration));
  final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
      NUM_CFS);
  for (int i = 0; i < NUM_CFS; i++) {
    Path hfile = new Path(dir, family(i));
    byte[] fam = Bytes.toBytes(family(i));
    createHFile(fs, hfile, fam, QUAL, val, 1000);
    famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
  }

  // bulk load HFiles
  final HConnection conn = UTIL.getHBaseAdmin().getConnection();
  RegionServerCallable<Void> callable =
      new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
    @Override
    public Void call(int callTimeout) throws Exception {
      LOG.debug("Going to connect to server " + getLocation() + " for row "
          + Bytes.toStringBinary(getRow()));
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      BulkLoadHFileRequest request =
        RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
      getStub().bulkLoadHFile(null, request);
      return null;
    }
  };
  RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
  RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
  caller.callWithRetries(callable, Integer.MAX_VALUE);

  // Periodically do compaction to reduce the number of open file handles.
  if (numBulkLoads.get() % 10 == 0) {
    // 10 * 50 = 500 open file handles!
    callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
      @Override
      public Void call(int callTimeout) throws Exception {
        LOG.debug("compacting " + getLocation() + " for row "
            + Bytes.toStringBinary(getRow()));
        AdminProtos.AdminService.BlockingInterface server =
          conn.getAdmin(getLocation().getServerName());
        CompactRegionRequest request =
          RequestConverter.buildCompactRegionRequest(
            getLocation().getRegionInfo().getRegionName(), true, null);
        server.compactRegion(null, request);
        numCompactions.incrementAndGet();
        return null;
      }
    };
    caller.callWithRetries(callable, Integer.MAX_VALUE);
  }
}
项目:ditb    文件:RegionCoprocessorRpcChannel.java   
@Override
protected Message callExecService(RpcController controller,
    Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
        throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Call: "+method.getName()+", "+request.toString());
  }

  if (row == null) {
    throw new IllegalArgumentException("Missing row property for remote region location");
  }

  final RpcController rpcController = controller == null
      ? rpcControllerFactory.newController() : controller;

  final ClientProtos.CoprocessorServiceCall call =
      ClientProtos.CoprocessorServiceCall.newBuilder()
          .setRow(ByteStringer.wrap(row))
          .setServiceName(method.getService().getFullName())
          .setMethodName(method.getName())
          .setRequest(request.toByteString()).build();
  RegionServerCallable<CoprocessorServiceResponse> callable =
      new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
    @Override
    public CoprocessorServiceResponse call(int callTimeout) throws Exception {
      if (rpcController instanceof PayloadCarryingRpcController) {
        ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
      }
      if (rpcController instanceof TimeLimitedRpcController) {
        ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
      }
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
    }
  };
  CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
      .callWithRetries(callable, operationTimeout);
  Message response = null;
  if (result.getValue().hasValue()) {
    Message.Builder builder = responsePrototype.newBuilderForType();
    ProtobufUtil.mergeFrom(builder, result.getValue().getValue());
    response = builder.build();
  } else {
    response = responsePrototype.getDefaultInstanceForType();
  }
  lastRegion = result.getRegion().getValue().toByteArray();
  if (LOG.isTraceEnabled()) {
    LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
  }
  return response;
}
项目:pbase    文件:TestHRegionServerBulkLoad.java   
public void doAnAction() throws Exception {
  long iteration = numBulkLoads.getAndIncrement();
  Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
      iteration));

  // create HFiles for different column families
  FileSystem fs = UTIL.getTestFileSystem();
  byte[] val = Bytes.toBytes(String.format("%010d", iteration));
  final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
      NUM_CFS);
  for (int i = 0; i < NUM_CFS; i++) {
    Path hfile = new Path(dir, family(i));
    byte[] fam = Bytes.toBytes(family(i));
    createHFile(fs, hfile, fam, QUAL, val, 1000);
    famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
  }

  // bulk load HFiles
  final HConnection conn = UTIL.getHBaseAdmin().getConnection();
  RegionServerCallable<Void> callable =
      new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
    @Override
    public Void call(int callTimeout) throws Exception {
      LOG.debug("Going to connect to server " + getLocation() + " for row "
          + Bytes.toStringBinary(getRow()));
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      BulkLoadHFileRequest request =
        RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
      getStub().bulkLoadHFile(null, request);
      return null;
    }
  };
  RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
  RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
  caller.callWithRetries(callable, Integer.MAX_VALUE);

  // Periodically do compaction to reduce the number of open file handles.
  if (numBulkLoads.get() % 10 == 0) {
    // 10 * 50 = 500 open file handles!
    callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
      @Override
      public Void call(int callTimeout) throws Exception {
        LOG.debug("compacting " + getLocation() + " for row "
            + Bytes.toStringBinary(getRow()));
        AdminProtos.AdminService.BlockingInterface server =
          conn.getAdmin(getLocation().getServerName());
        CompactRegionRequest request =
          RequestConverter.buildCompactRegionRequest(
            getLocation().getRegionInfo().getRegionName(), true, null);
        server.compactRegion(null, request);
        numCompactions.incrementAndGet();
        return null;
      }
    };
    caller.callWithRetries(callable, Integer.MAX_VALUE);
  }
}
项目:HIndex    文件:TestHRegionServerBulkLoad.java   
public void doAnAction() throws Exception {
  long iteration = numBulkLoads.getAndIncrement();
  Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
      iteration));

  // create HFiles for different column families
  FileSystem fs = UTIL.getTestFileSystem();
  byte[] val = Bytes.toBytes(String.format("%010d", iteration));
  final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
      NUM_CFS);
  for (int i = 0; i < NUM_CFS; i++) {
    Path hfile = new Path(dir, family(i));
    byte[] fam = Bytes.toBytes(family(i));
    createHFile(fs, hfile, fam, QUAL, val, 1000);
    famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
  }

  // bulk load HFiles
  final HConnection conn = UTIL.getHBaseAdmin().getConnection();
  TableName tbl = TableName.valueOf(tableName);
  RegionServerCallable<Void> callable =
      new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
    @Override
    public Void call() throws Exception {
      LOG.debug("Going to connect to server " + getLocation() + " for row "
          + Bytes.toStringBinary(getRow()));
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      BulkLoadHFileRequest request =
        RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
      getStub().bulkLoadHFile(null, request);
      return null;
    }
  };
  RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
  RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
  caller.callWithRetries(callable);

  // Periodically do compaction to reduce the number of open file handles.
  if (numBulkLoads.get() % 10 == 0) {
    // 10 * 50 = 500 open file handles!
    callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
      @Override
      public Void call() throws Exception {
        LOG.debug("compacting " + getLocation() + " for row "
            + Bytes.toStringBinary(getRow()));
        AdminProtos.AdminService.BlockingInterface server =
          conn.getAdmin(getLocation().getServerName());
        CompactRegionRequest request =
          RequestConverter.buildCompactRegionRequest(
            getLocation().getRegionInfo().getRegionName(), true, null);
        server.compactRegion(null, request);
        numCompactions.incrementAndGet();
        return null;
      }
    };
    caller.callWithRetries(callable);
  }
}
项目:HIndex    文件:RegionCoprocessorRpcChannel.java   
@Override
protected Message callExecService(Descriptors.MethodDescriptor method,
                                Message request, Message responsePrototype)
    throws IOException {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Call: "+method.getName()+", "+request.toString());
  }

  if (row == null) {
    throw new IllegalArgumentException("Missing row property for remote region location");
  }

  final ClientProtos.CoprocessorServiceCall call =
      ClientProtos.CoprocessorServiceCall.newBuilder()
          .setRow(HBaseZeroCopyByteString.wrap(row))
          .setServiceName(method.getService().getFullName())
          .setMethodName(method.getName())
          .setRequest(request.toByteString()).build();
  final PayloadCarryingRpcController controller = rpcController.newController();
  controller.setPriority(table);
  RegionServerCallable<CoprocessorServiceResponse> callable =
      new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
        public CoprocessorServiceResponse call() throws Exception {
          byte[] regionName = getLocation().getRegionInfo().getRegionName();
          return ProtobufUtil.execService(getStub(), call, regionName, controller);
        }
      };
  CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
      .callWithRetries(callable);
  Message response = null;
  if (result.getValue().hasValue()) {
    response = responsePrototype.newBuilderForType()
        .mergeFrom(result.getValue().getValue()).build();
  } else {
    response = responsePrototype.getDefaultInstanceForType();
  }
  lastRegion = result.getRegion().getValue().toByteArray();
  if (LOG.isTraceEnabled()) {
    LOG.trace("Result is region=" + Bytes.toStringBinary(lastRegion) + ", value=" + response);
  }
  return response;
}
项目:hbase    文件:MetaTableAccessor.java   
/**
 * Performs an atomic multi-mutate operation against the given table.
 */
// Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal
// this facility for CPEP use or at least those CPEPs that are on their way to becoming part of
// core as is the intent for RSGroup eventually.
public static void multiMutate(Connection connection, final Table table, byte[] row,
    final List<Mutation> mutations)
throws IOException {
  debugLogMutations(mutations);
  // TODO: Need rollback!!!!
  // TODO: Need Retry!!!
  // TODO: What for a timeout? Default write timeout? GET FROM HTABLE?
  // TODO: Review when we come through with ProcedureV2.
  RegionServerCallable<MutateRowsResponse,
      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface> callable =
      new RegionServerCallable<MutateRowsResponse,
        MultiRowMutationProtos.MultiRowMutationService.BlockingInterface>(
            connection, table.getName(), row, null/*RpcController not used in this CPEP!*/) {
    @Override
    protected MutateRowsResponse rpcCall() throws Exception {
      final MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
      for (Mutation mutation : mutations) {
        if (mutation instanceof Put) {
          builder.addMutationRequest(ProtobufUtil.toMutation(
            ClientProtos.MutationProto.MutationType.PUT, mutation));
        } else if (mutation instanceof Delete) {
          builder.addMutationRequest(ProtobufUtil.toMutation(
            ClientProtos.MutationProto.MutationType.DELETE, mutation));
        } else {
          throw new DoNotRetryIOException("multi in MetaEditor doesn't support "
            + mutation.getClass().getName());
        }
      }
      // The call to #prepare that ran before this invocation will have populated HRegionLocation.
      HRegionLocation hrl = getLocation();
      RegionSpecifier region = ProtobufUtil.buildRegionSpecifier(
          RegionSpecifierType.REGION_NAME, hrl.getRegionInfo().getRegionName());
      builder.setRegion(region);
      // The rpcController here is awkward. The Coprocessor Endpoint wants an instance of a
      // com.google.protobuf but we are going over an rpc that is all shaded protobuf so it
      // wants a org.apache.h.h.shaded.com.google.protobuf.RpcController. Set up a factory
      // that makes com.google.protobuf.RpcController and then copy into it configs.
      return getStub().mutateRows(null, builder.build());
    }

    @Override
    // Called on the end of the super.prepare call. Set the stub.
    protected void setStubByServiceName(ServerName serviceName/*Ignored*/) throws IOException {
      CoprocessorRpcChannel channel = table.coprocessorService(getRow());
      setStub(MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel));
    }
  };
  int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
      connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
          HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
  // The region location should be cached in connection. Call prepare so this callable picks
  // up the region location (see super.prepare method).
  callable.prepare(false);
  callable.call(writeTimeout);
}
项目:PyroDB    文件:TestHRegionServerBulkLoad.java   
public void doAnAction() throws Exception {
  long iteration = numBulkLoads.getAndIncrement();
  Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
      iteration));

  // create HFiles for different column families
  FileSystem fs = UTIL.getTestFileSystem();
  byte[] val = Bytes.toBytes(String.format("%010d", iteration));
  final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
      NUM_CFS);
  for (int i = 0; i < NUM_CFS; i++) {
    Path hfile = new Path(dir, family(i));
    byte[] fam = Bytes.toBytes(family(i));
    createHFile(fs, hfile, fam, QUAL, val, 1000);
    famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
  }

  // bulk load HFiles
  final HConnection conn = UTIL.getHBaseAdmin().getConnection();
  TableName tbl = TableName.valueOf(tableName);
  RegionServerCallable<Void> callable =
      new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
    @Override
    public Void call(int callTimeout) throws Exception {
      LOG.debug("Going to connect to server " + getLocation() + " for row "
          + Bytes.toStringBinary(getRow()));
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      BulkLoadHFileRequest request =
        RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
      getStub().bulkLoadHFile(null, request);
      return null;
    }
  };
  RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
  RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
  caller.callWithRetries(callable, Integer.MAX_VALUE);

  // Periodically do compaction to reduce the number of open file handles.
  if (numBulkLoads.get() % 10 == 0) {
    // 10 * 50 = 500 open file handles!
    callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
      @Override
      public Void call(int callTimeout) throws Exception {
        LOG.debug("compacting " + getLocation() + " for row "
            + Bytes.toStringBinary(getRow()));
        AdminProtos.AdminService.BlockingInterface server =
          conn.getAdmin(getLocation().getServerName());
        CompactRegionRequest request =
          RequestConverter.buildCompactRegionRequest(
            getLocation().getRegionInfo().getRegionName(), true, null);
        server.compactRegion(null, request);
        numCompactions.incrementAndGet();
        return null;
      }
    };
    caller.callWithRetries(callable, Integer.MAX_VALUE);
  }
}
项目:c5    文件:TestHRegionServerBulkLoad.java   
public void doAnAction() throws Exception {
  long iteration = numBulkLoads.getAndIncrement();
  Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
      iteration));

  // create HFiles for different column families
  FileSystem fs = UTIL.getTestFileSystem();
  byte[] val = Bytes.toBytes(String.format("%010d", iteration));
  final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
      NUM_CFS);
  for (int i = 0; i < NUM_CFS; i++) {
    Path hfile = new Path(dir, family(i));
    byte[] fam = Bytes.toBytes(family(i));
    createHFile(fs, hfile, fam, QUAL, val, 1000);
    famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
  }

  // bulk load HFiles
  final HConnection conn = UTIL.getHBaseAdmin().getConnection();
  TableName tbl = TableName.valueOf(tableName);
  RegionServerCallable<Void> callable =
      new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
    @Override
    public Void call() throws Exception {
      LOG.debug("Going to connect to server " + getLocation() + " for row "
          + Bytes.toStringBinary(getRow()));
      byte[] regionName = getLocation().getRegionInfo().getRegionName();
      BulkLoadHFileRequest request =
        RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
      getStub().bulkLoadHFile(null, request);
      return null;
    }
  };
  RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
  RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
  caller.callWithRetries(callable);

  // Periodically do compaction to reduce the number of open file handles.
  if (numBulkLoads.get() % 10 == 0) {
    // 10 * 50 = 500 open file handles!
    callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes("aaa")) {
      @Override
      public Void call() throws Exception {
        LOG.debug("compacting " + getLocation() + " for row "
            + Bytes.toStringBinary(getRow()));
        AdminProtos.AdminService.BlockingInterface server =
          conn.getAdmin(getLocation().getServerName());
        CompactRegionRequest request =
          RequestConverter.buildCompactRegionRequest(
            getLocation().getRegionInfo().getRegionName(), true, null);
        server.compactRegion(null, request);
        numCompactions.incrementAndGet();
        return null;
      }
    };
    caller.callWithRetries(callable);
  }
}