Java 类org.apache.hadoop.hbase.wal.WALProvider.Writer 实例源码

项目:ditb    文件:WALSplitter.java   
/**
 * @return a path with a write for that path. caller should close.
 */
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
  Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
  if (regionedits == null) {
    return null;
  }
  if (fs.exists(regionedits)) {
    LOG.warn("Found old edits file. It could be the "
        + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
        + fs.getFileStatus(regionedits).getLen());
    if (!fs.delete(regionedits, false)) {
      LOG.warn("Failed delete of old " + regionedits);
    }
  }
  Writer w = createWriter(regionedits);
  LOG.debug("Creating writer path=" + regionedits);
  return new WriterAndPath(regionedits, w);
}
项目:ditb    文件:IntegrationTestIngestWithEncryption.java   
@Override
public void setUpCluster() throws Exception {
  util = getTestingUtil(null);
  Configuration conf = util.getConfiguration();
  if (!util.isDistributedCluster()) {
    // Inject required configuration if we are not running in distributed mode
    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
      Reader.class);
    conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
      Writer.class);
    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
  }
  // Check if the cluster configuration can support this test
  try {
    EncryptionTest.testEncryption(conf, "AES", null);
  } catch (Exception e) {
    LOG.warn("Encryption configuration test did not pass, skipping test");
    return;
  }
  super.setUpCluster();
  initialized = true;
}
项目:pbase    文件:WALSplitter.java   
/**
 * @return a path with a write for that path. caller should close.
 */
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
  Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
  if (regionedits == null) {
    return null;
  }
  if (fs.exists(regionedits)) {
    LOG.warn("Found old edits file. It could be the "
        + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
        + fs.getFileStatus(regionedits).getLen());
    if (!fs.delete(regionedits, false)) {
      LOG.warn("Failed delete of old " + regionedits);
    }
  }
  Writer w = createWriter(regionedits);
  LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
  return (new WriterAndPath(regionedits, w));
}
项目:pbase    文件:TestWALSplit.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  conf = TEST_UTIL.getConfiguration();
  conf.setClass("hbase.regionserver.hlog.writer.impl",
    InstrumentedLogWriter.class, Writer.class);
  conf.setBoolean("dfs.support.broken.append", true);
  conf.setBoolean("dfs.support.append", true);
  // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
  System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
  // Create fake maping user to group and set it to the conf.
  Map<String, String []> u2g_map = new HashMap<String, String []>(2);
  ROBBER = User.getCurrent().getName() + "-robber";
  ZOMBIE = User.getCurrent().getName() + "-zombie";
  u2g_map.put(ROBBER, GROUP);
  u2g_map.put(ZOMBIE, GROUP);
  DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
  conf.setInt("dfs.heartbeat.interval", 1);
  TEST_UTIL.startMiniDFSCluster(2);
}
项目:pbase    文件:TestWALSplit.java   
@Test(timeout=300000, expected = IOException.class)
public void testSplitWillFailIfWritingToRegionFails() throws Exception {
  //leave 5th log open so we could append the "trap"
  Writer writer = generateWALs(4);
  useDifferentDFSClient();

  String region = "break";
  Path regiondir = new Path(TABLEDIR, region);
  fs.mkdirs(regiondir);

  InstrumentedLogWriter.activateFailure = false;
  appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
      ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
  writer.close();

  try {
    InstrumentedLogWriter.activateFailure = true;
    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
  } catch (IOException e) {
    assertTrue(e.getMessage().
      contains("This exception is instrumented and should only be thrown for testing"));
    throw e;
  } finally {
    InstrumentedLogWriter.activateFailure = false;
  }
}
项目:pbase    文件:TestWALSplit.java   
/**
 * @param leaveOpen index to leave un-closed. -1 to close all.
 * @return the writer that's still open, or null if all were closed.
 */
private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
  makeRegionDirs(REGIONS);
  fs.mkdirs(WALDIR);
  Writer [] ws = new Writer[writers];
  int seq = 0;
  for (int i = 0; i < writers; i++) {
    ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
    for (int j = 0; j < entries; j++) {
      int prefix = 0;
      for (String region : REGIONS) {
        String row_key = region + prefix++ + i + j;
        appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
            VALUE, seq++);
      }
    }
    if (i != leaveOpen) {
      ws[i].close();
      LOG.info("Closing writer " + i);
    }
  }
  if (leaveOpen < 0 || leaveOpen >= writers) {
    return null;
  }
  return ws[leaveOpen];
}
项目:hbase    文件:WALSplitter.java   
/**
 * @return a path with a write for that path. caller should close.
 */
WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
  Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
  if (regionedits == null) {
    return null;
  }
  if (fs.exists(regionedits)) {
    LOG.warn("Found old edits file. It could be the "
        + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
        + fs.getFileStatus(regionedits).getLen());
    if (!fs.delete(regionedits, false)) {
      LOG.warn("Failed delete of old {}", regionedits);
    }
  }
  Writer w = createWriter(regionedits);
  LOG.debug("Creating writer path={}", regionedits);
  return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
}
项目:hbase    文件:TestWALSplit.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  conf = TEST_UTIL.getConfiguration();
  conf.setClass("hbase.regionserver.hlog.writer.impl",
      InstrumentedLogWriter.class, Writer.class);
  // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
  System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
  // Create fake maping user to group and set it to the conf.
  Map<String, String []> u2g_map = new HashMap<>(2);
  ROBBER = User.getCurrent().getName() + "-robber";
  ZOMBIE = User.getCurrent().getName() + "-zombie";
  u2g_map.put(ROBBER, GROUP);
  u2g_map.put(ZOMBIE, GROUP);
  DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
  conf.setInt("dfs.heartbeat.interval", 1);
  TEST_UTIL.startMiniDFSCluster(2);
}
项目:hbase    文件:TestWALSplit.java   
@Test(timeout=300000, expected = IOException.class)
public void testSplitWillFailIfWritingToRegionFails() throws Exception {
  //leave 5th log open so we could append the "trap"
  Writer writer = generateWALs(4);
  useDifferentDFSClient();

  String region = "break";
  Path regiondir = new Path(TABLEDIR, region);
  fs.mkdirs(regiondir);

  InstrumentedLogWriter.activateFailure = false;
  appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
      Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
  writer.close();

  try {
    InstrumentedLogWriter.activateFailure = true;
    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
  } catch (IOException e) {
    assertTrue(e.getMessage().
        contains("This exception is instrumented and should only be thrown for testing"));
    throw e;
  } finally {
    InstrumentedLogWriter.activateFailure = false;
  }
}
项目:hbase    文件:TestWALSplit.java   
private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
    String output) throws IOException {
  WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
  desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
      .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
      .setRegionName(ByteString.copyFrom(hri.getRegionName()))
      .setFamilyName(ByteString.copyFrom(FAMILY))
      .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
      .addAllCompactionInput(Arrays.asList(inputs))
      .addCompactionOutput(output);

  WALEdit edit = WALEdit.createCompaction(hri, desc.build());
  WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
      EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
  w.append(new Entry(key, edit));
  w.sync();
}
项目:hbase    文件:TestWALSplit.java   
private static void appendRegionEvent(Writer w, String region) throws IOException {
  WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
      WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
      TABLE_NAME.toBytes(),
      Bytes.toBytes(region),
      Bytes.toBytes(String.valueOf(region.hashCode())),
      1,
      ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
  final long time = EnvironmentEdgeManager.currentTime();
  KeyValue kv = new KeyValue(Bytes.toBytes(region), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
      time, regionOpenDesc.toByteArray());
  final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
      HConstants.DEFAULT_CLUSTER_ID);
  w.append(
      new Entry(walKey, new WALEdit().add(kv)));
  w.sync();
}
项目:hbase    文件:IntegrationTestIngestWithEncryption.java   
@Override
public void setUpCluster() throws Exception {
  util = getTestingUtil(null);
  Configuration conf = util.getConfiguration();
  if (!util.isDistributedCluster()) {
    // Inject required configuration if we are not running in distributed mode
    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
      Reader.class);
    conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
      Writer.class);
    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
  }
  // Check if the cluster configuration can support this test
  try {
    EncryptionTest.testEncryption(conf, "AES", null);
  } catch (Exception e) {
    LOG.warn("Encryption configuration test did not pass, skipping test", e);
    return;
  }
  super.setUpCluster();
  initialized = true;
}
项目:ditb    文件:WALFactory.java   
/**
 * If you already have a WALFactory, you should favor the instance method.
 * @return a writer that won't overwrite files. Caller must close.
 */
@VisibleForTesting
public static Writer createWALWriter(final FileSystem fs, final Path path,
    final Configuration configuration)
    throws IOException {
  return DefaultWALProvider.createWriter(configuration, fs, path, false);
}
项目:ditb    文件:TestSecureWALReplay.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
  conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
  conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
  conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
    Reader.class);
  conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
    Writer.class);
  conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
  TestWALReplay.setUpBeforeClass();
}
项目:pbase    文件:WALFactory.java   
/**
 * If you already have a WALFactory, you should favor the instance method.
 * @return a writer that won't overwrite files. Caller must close.
 */
@VisibleForTesting
public static Writer createWALWriter(final FileSystem fs, final Path path,
    final Configuration configuration)
    throws IOException {
  return DefaultWALProvider.createWriter(configuration, fs, path, false);
}
项目:pbase    文件:TestWALSplit.java   
private void doWriting() throws IOException, InterruptedException {
  this.user.runAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
      // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
      // index we supply here.
      int walToKeepOpen = numOfWriters - 1;
      // The below method writes numOfWriters files each with ENTRIES entries for a total of
      // numOfWriters * ENTRIES added per column family in the region.
      Writer writer = null;
      try {
        writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
      } catch (IOException e1) {
        throw new RuntimeException("Failed", e1);
      }
      // Update counter so has all edits written so far.
      editsCount.addAndGet(numOfWriters * ENTRIES);
      loop(writer);
      // If we've been interruped, then things should have shifted out from under us.
      // closing should error
      try {
        writer.close();
        fail("Writing closing after parsing should give an error.");
      } catch (IOException exception) {
        LOG.debug("ignoring error when closing final writer.", exception);
      }
      return null;
    }
  });
}
项目:pbase    文件:TestWALSplit.java   
private void loop(final Writer writer) {
  byte [] regionBytes = Bytes.toBytes(this.region);
  while (!stop.get()) {
    try {
      long seq = appendEntry(writer, TABLE_NAME, regionBytes,
          ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
      long count = editsCount.incrementAndGet();
      LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
      try {
        Thread.sleep(1);
      } catch (InterruptedException e) {
        //
      }
    } catch (IOException ex) {
      LOG.error(getName() + " ex " + ex.toString());
      if (ex instanceof RemoteException) {
        LOG.error("Juliet: got RemoteException " + ex.getMessage() +
          " while writing " + (editsCount.get() + 1));
      } else {
        LOG.error(getName() + " failed to write....at " + editsCount.get());
        fail("Failed to write " + editsCount.get());
      }
      break;
    } catch (Throwable t) {
      LOG.error(getName() + " HOW? " + t);
      LOG.debug("exception details", t);
      break;
    }
  }
  LOG.info(getName() + " Writer exiting");
}
项目:pbase    文件:TestWALSplit.java   
public static long appendEntry(Writer writer, TableName table, byte[] region,
                        byte[] row, byte[] family, byte[] qualifier,
                        byte[] value, long seq)
        throws IOException {
  LOG.info(Thread.currentThread().getName() + " append");
  writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
  LOG.info(Thread.currentThread().getName() + " sync");
  writer.sync();
  return seq;
}
项目:pbase    文件:TestSecureWALReplay.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
  conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
  conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
  conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
    Reader.class);
  conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
    Writer.class);
  conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
  TestWALReplay.setUpBeforeClass();
}
项目:hbase    文件:WALFactory.java   
/**
 * If you already have a WALFactory, you should favor the instance method.
 * @return a writer that won't overwrite files. Caller must close.
 */
@VisibleForTesting
public static Writer createWALWriter(final FileSystem fs, final Path path,
    final Configuration configuration)
    throws IOException {
  return FSHLogProvider.createWriter(configuration, fs, path, false);
}
项目:hbase    文件:FSHLog.java   
/**
 * This method allows subclasses to inject different writers without having to extend other
 * methods like rollWriter().
 * @return Writer instance
 */
@Override
protected Writer createWriterInstance(final Path path) throws IOException {
  Writer writer = FSHLogProvider.createWriter(conf, fs, path, false);
  if (writer instanceof ProtobufLogWriter) {
    preemptiveSync((ProtobufLogWriter) writer);
  }
  return writer;
}
项目:hbase    文件:TestWALSplit.java   
private void doWriting() throws IOException, InterruptedException {
  this.user.runAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
      // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
      // index we supply here.
      int walToKeepOpen = numOfWriters - 1;
      // The below method writes numOfWriters files each with ENTRIES entries for a total of
      // numOfWriters * ENTRIES added per column family in the region.
      Writer writer = null;
      try {
        writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
      } catch (IOException e1) {
        throw new RuntimeException("Failed", e1);
      }
      // Update counter so has all edits written so far.
      editsCount.addAndGet(numOfWriters * ENTRIES);
      loop(writer);
      // If we've been interruped, then things should have shifted out from under us.
      // closing should error
      try {
        writer.close();
        fail("Writing closing after parsing should give an error.");
      } catch (IOException exception) {
        LOG.debug("ignoring error when closing final writer.", exception);
      }
      return null;
    }
  });
}
项目:hbase    文件:TestWALSplit.java   
private void loop(final Writer writer) {
  byte [] regionBytes = Bytes.toBytes(this.region);
  while (!stop.get()) {
    try {
      long seq = appendEntry(writer, TABLE_NAME, regionBytes,
          Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
      long count = editsCount.incrementAndGet();
      LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
      try {
        Thread.sleep(1);
      } catch (InterruptedException e) {
        //
      }
    } catch (IOException ex) {
      LOG.error(getName() + " ex " + ex.toString());
      if (ex instanceof RemoteException) {
        LOG.error("Juliet: got RemoteException " + ex.getMessage() +
            " while writing " + (editsCount.get() + 1));
      } else {
        LOG.error(getName() + " failed to write....at " + editsCount.get());
        fail("Failed to write " + editsCount.get());
      }
      break;
    } catch (Throwable t) {
      LOG.error(getName() + " HOW? " + t);
      LOG.debug("exception details", t);
      break;
    }
  }
  LOG.info(getName() + " Writer exiting");
}
项目:hbase    文件:TestWALSplit.java   
@Test (timeout=300000)
public void testSplitLeavesCompactionEventsEdits() throws IOException{
  RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
  REGIONS.clear();
  REGIONS.add(hri.getEncodedName());
  Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
  LOG.info("Creating region directory: " + regionDir);
  assertTrue(fs.mkdirs(regionDir));

  Writer writer = generateWALs(1, 10, 0, 10);
  String[] compactInputs = new String[]{"file1", "file2", "file3"};
  String compactOutput = "file4";
  appendCompactionEvent(writer, hri, compactInputs, compactOutput);
  writer.close();

  useDifferentDFSClient();
  WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);

  Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
  // original log should have 10 test edits, 10 region markers, 1 compaction marker
  assertEquals(21, countWAL(originalLog));

  Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName());
  assertEquals(1, splitLog.length);

  assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
  // split log should have 10 test edits plus 1 compaction marker
  assertEquals(11, countWAL(splitLog[0]));
}
项目:hbase    文件:TestWALSplit.java   
/**
 * @param leaveOpen index to leave un-closed. -1 to close all.
 * @return the writer that's still open, or null if all were closed.
 */
private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
  makeRegionDirs(REGIONS);
  fs.mkdirs(WALDIR);
  Writer [] ws = new Writer[writers];
  int seq = 0;
  int numRegionEventsAdded = 0;
  for (int i = 0; i < writers; i++) {
    ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
    for (int j = 0; j < entries; j++) {
      int prefix = 0;
      for (String region : REGIONS) {
        String row_key = region + prefix++ + i + j;
        appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
            QUALIFIER, VALUE, seq++);

        if (numRegionEventsAdded < regionEvents) {
          numRegionEventsAdded ++;
          appendRegionEvent(ws[i], region);
        }
      }
    }
    if (i != leaveOpen) {
      ws[i].close();
      LOG.info("Closing writer " + i);
    }
  }
  if (leaveOpen < 0 || leaveOpen >= writers) {
    return null;
  }
  return ws[leaveOpen];
}
项目:hbase    文件:TestWALSplit.java   
public static long appendEntry(Writer writer, TableName table, byte[] region,
    byte[] row, byte[] family, byte[] qualifier,
    byte[] value, long seq)
    throws IOException {
  LOG.info(Thread.currentThread().getName() + " append");
  writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
  LOG.info(Thread.currentThread().getName() + " sync");
  writer.sync();
  return seq;
}
项目:hbase    文件:TestWALSplit.java   
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
  Writer writer =
      WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
  if (closeFile) {
    writer.close();
  }
}
项目:hbase    文件:TestSecureWALReplay.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
  conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
  conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
  conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
    Reader.class);
  conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
    Writer.class);
  conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
  AbstractTestWALReplay.setUpBeforeClass();
}
项目:ditb    文件:WALFactory.java   
/**
 * should be package-private, visible for recovery testing.
 * @return an overwritable writer for recovered edits. caller should close.
 */
@VisibleForTesting
public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
    throws IOException {
  return DefaultWALProvider.createWriter(conf, fs, path, true);
}
项目:ditb    文件:WALFactory.java   
/**
 * If you already have a WALFactory, you should favor the instance method.
 * @return a Writer that will overwrite files. Caller must close.
 */
static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
    final Configuration configuration)
    throws IOException {
  return DefaultWALProvider.createWriter(configuration, fs, path, true);
}
项目:ditb    文件:WALSplitter.java   
/**
 * Create a new {@link Writer} for writing log splits.
 * @return a new Writer instance, caller should close
 */
protected Writer createWriter(Path logfile)
    throws IOException {
  return walFactory.createRecoveredEditsWriter(fs, logfile);
}
项目:ditb    文件:WALSplitter.java   
WriterAndPath(final Path p, final Writer w) {
  this.p = p;
  this.w = w;
}
项目:pbase    文件:WALFactory.java   
/**
 * should be package-private, visible for recovery testing.
 * @return an overwritable writer for recovered edits. caller should close.
 */
@VisibleForTesting
public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
    throws IOException {
  return DefaultWALProvider.createWriter(conf, fs, path, true);
}
项目:pbase    文件:WALFactory.java   
/**
 * If you already have a WALFactory, you should favor the instance method.
 * @return a Writer that will overwrite files. Caller must close.
 */
static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
    final Configuration configuration)
    throws IOException {
  return DefaultWALProvider.createWriter(configuration, fs, path, true);
}
项目:pbase    文件:WALSplitter.java   
/**
 * Create a new {@link Writer} for writing log splits.
 * @return a new Writer instance, caller should close
 */
protected Writer createWriter(Path logfile)
    throws IOException {
  return walFactory.createRecoveredEditsWriter(fs, logfile);
}
项目:pbase    文件:WALSplitter.java   
WriterAndPath(final Path p, final Writer w) {
  this.p = p;
  this.w = w;
}
项目:pbase    文件:TestWALSplit.java   
@Test (timeout=300000)
public void testIOEOnOutputThread() throws Exception {
  conf.setBoolean(HBASE_SKIP_ERRORS, false);

  generateWALs(-1);
  useDifferentDFSClient();
  FileStatus[] logfiles = fs.listStatus(WALDIR);
  assertTrue("There should be some log file",
    logfiles != null && logfiles.length > 0);
  // wals with no entries (like the one we don't use in the factory)
  // won't cause a failure since nothing will ever be written.
  // pick the largest one since it's most likely to have entries.
  int largestLogFile = 0;
  long largestSize = 0;
  for (int i = 0; i < logfiles.length; i++) {
    if (logfiles[i].getLen() > largestSize) {
      largestLogFile = i;
      largestSize = logfiles[i].getLen();
    }
  }
  assertTrue("There should be some log greater than size 0.", 0 < largestSize);
  // Set up a splitter that will throw an IOE on the output side
  WALSplitter logSplitter = new WALSplitter(wals,
      conf, HBASEDIR, fs, null, null, this.mode) {
    @Override
    protected Writer createWriter(Path logfile) throws IOException {
      Writer mockWriter = Mockito.mock(Writer.class);
      Mockito.doThrow(new IOException("Injected")).when(
        mockWriter).append(Mockito.<Entry>any());
      return mockWriter;
    }
  };
  // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
  // the thread dumping in a background thread so it does not hold up the test.
  final AtomicBoolean stop = new AtomicBoolean(false);
  final Thread someOldThread = new Thread("Some-old-thread") {
    @Override
    public void run() {
      while(!stop.get()) Threads.sleep(10);
    }
  };
  someOldThread.setDaemon(true);
  someOldThread.start();
  final Thread t = new Thread("Background-thread-dumper") {
    public void run() {
      try {
        Threads.threadDumpingIsAlive(someOldThread);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  };
  t.setDaemon(true);
  t.start();
  try {
    logSplitter.splitLogFile(logfiles[largestLogFile], null);
    fail("Didn't throw!");
  } catch (IOException ioe) {
    assertTrue(ioe.toString().contains("Injected"));
  } finally {
    // Setting this to true will turn off the background thread dumper.
    stop.set(true);
  }
}
项目:pbase    文件:TestWALSplit.java   
/**
 * @throws IOException
 * @see https://issues.apache.org/jira/browse/HBASE-4862
 */
@Test (timeout=300000)
public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
  LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
  // Generate wals for our destination region
  String regionName = "r0";
  final Path regiondir = new Path(TABLEDIR, regionName);
  REGIONS.clear();
  REGIONS.add(regionName);
  generateWALs(-1);

  wals.getWAL(Bytes.toBytes(regionName));
  FileStatus[] logfiles = fs.listStatus(WALDIR);
  assertTrue("There should be some log file",
    logfiles != null && logfiles.length > 0);

  WALSplitter logSplitter = new WALSplitter(wals,
      conf, HBASEDIR, fs, null, null, this.mode) {
    @Override
    protected Writer createWriter(Path logfile)
    throws IOException {
      Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
      // After creating writer, simulate region's
      // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
      // region and delete them, excluding files with '.temp' suffix.
      NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
      if (files != null && !files.isEmpty()) {
        for (Path file : files) {
          if (!this.fs.delete(file, false)) {
            LOG.error("Failed delete of " + file);
          } else {
            LOG.debug("Deleted recovered.edits file=" + file);
          }
        }
      }
      return writer;
    }
  };
  try{
    logSplitter.splitLogFile(logfiles[0], null);
  } catch (IOException e) {
    LOG.info(e);
    fail("Throws IOException when spliting "
        + "log, it is most likely because writing file does not "
        + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
  }
  if (fs.exists(CORRUPTDIR)) {
    if (fs.listStatus(CORRUPTDIR).length > 0) {
      fail("There are some corrupt logs, "
              + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
    }
  }
}
项目:pbase    文件:TestWALSplit.java   
private Writer generateWALs(int leaveOpen) throws IOException {
  return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
}
项目:pbase    文件:TestWALSplit.java   
private void injectEmptyFile(String suffix, boolean closeFile)
        throws IOException {
  Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
      conf);
  if (closeFile) writer.close();
}