/** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. * @param listeners may be null. will be given to all created wals (and not meta-wals) * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations * to make a directory */ public WALFactory(final Configuration conf, final List<WALActionsListener> listeners, final String factoryId) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, DefaultWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null); } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); provider.init(this, conf, null, factoryId); } }
/** * @param identifier may not be null, contents will not be altered */ public WAL getMetaWAL(final byte[] identifier) throws IOException { WALProvider metaProvider = this.metaProvider.get(); if (null == metaProvider) { final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, Collections.<WALActionsListener>singletonList(new MetricsWAL()), DefaultWALProvider.META_WAL_PROVIDER_ID); if (this.metaProvider.compareAndSet(null, temp)) { metaProvider = temp; } else { // reference must now be to a provider created in another thread. temp.close(); metaProvider = this.metaProvider.get(); } } return metaProvider.getWAL(identifier); }
@Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; for (Cell cell : edits.getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } final long elapsed = (System.nanoTime() - start)/1000000l; for (WALActionsListener listener : this.listeners) { listener.postAppend(len, elapsed); } } return -1; }
/** * Convenience method creating new HRegions. Used by createTable. The {@link WAL} for the created * region needs to be closed explicitly, if it is not null. Use {@link HRegion#getWAL()} to get * access. * * @param info Info for region to create. * @param rootDir Root directory for HBase instance * @param tableDir table directory * @param wal shared WAL * @param initialize - true to initialize the region * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable * @return new HRegion * @throws IOException */ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor, final WAL wal, final boolean initialize, final boolean ignoreWAL) throws IOException { LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); FileSystem fs = FileSystem.get(conf); HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info); WAL effectiveWAL = wal; if (wal == null && !ignoreWAL) { // TODO HBASE-11983 There'll be no roller for this wal? // The WAL subsystem will use the default rootDir rather than the passed // in rootDir // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); effectiveWAL = (new WALFactory(confForWAL, Collections.<WALActionsListener>singletonList(new MetricsWAL()), "hregion-" + RandomStringUtils.randomNumeric(8))).getWAL(info.getEncodedNameAsBytes()); } HRegion region = HRegion.newHRegion(tableDir, effectiveWAL, fs, conf, info, hTableDescriptor, null); if (initialize) region.initialize(null); return region; }
/** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null * @param listeners may be null * @param providerId differentiate between providers from one facotry, used for FS layout. may be * null */ @Override public void init(final WALFactory factory, final Configuration conf, final List<WALActionsListener> listeners, String providerId) throws IOException { if (null != log) { throw new IllegalStateException("WALProvider.init should only be called once."); } if (null == providerId) { providerId = DEFAULT_PROVIDER_ID; } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf), DefaultWALProvider.getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); }
@Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; for (Cell cell : edits.getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } final long elapsed = (System.nanoTime() - start)/1000000l; for (WALActionsListener listener : this.listeners) { listener.postAppend(len, elapsed); } } return -1; }
@Override public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; for (Cell cell : edits.getCells()) { len += PrivateCellUtil.estimatedSerializedSizeOf(cell); } final long elapsed = (System.nanoTime() - start) / 1000000L; for (WALActionsListener listener : this.listeners) { listener.postAppend(len, elapsed, key, edits); } } return -1; }
@Override public void init(final WALFactory factory, final Configuration conf, final List<WALActionsListener> listeners, final String providerId) throws IOException { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.factory = factory; this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners); this.providerId = providerId; this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); }
@Override public void init(final WALFactory factory, final Configuration conf, final List<WALActionsListener> listeners, final String providerId) throws IOException { super.init(factory, conf, listeners, providerId); // no need to check for and close down old providers; our parent class will throw on re-invoke delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS))]; for (int i = 0; i < delegates.length; i++) { delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, providerId + i); } LOG.info("Configured to run with " + delegates.length + " delegate WAL providers."); }
@Override public void init(final WALFactory factory, final Configuration conf, final List<WALActionsListener> listeners, String providerId) throws IOException { if (null != disabled) { throw new IllegalStateException("WALProvider.init should only be called once."); } if (null == providerId) { providerId = "defaultDisabled"; } disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null); }
public DisabledWAL(final Path path, final Configuration conf, final List<WALActionsListener> listeners) { this.coprocessorHost = new WALCoprocessorHost(this, conf); this.path = path; if (null != listeners) { for(WALActionsListener listener : listeners) { registerWALActionsListener(listener); } } }
@Override public void shutdown() { if(closed.compareAndSet(false, true)) { if (!this.listeners.isEmpty()) { for (WALActionsListener listener : this.listeners) { listener.logCloseRequested(); } } } }
@Override public void sync() { if (!this.listeners.isEmpty()) { for (WALActionsListener listener : this.listeners) { listener.postSync(0l, 0); } } }
/** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. * * @return A WAL instance. * @throws IOException */ private WALFactory setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); if (this.fs.exists(logdir)) { throw new RegionServerRunningException( "Region server has already " + "created directory at " + this.serverName.toString()); } // Instantiate replication manager if replication enabled. Pass it the // log directories. createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); // listeners the wal factory will add to wals it creates. final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); listeners.add(new MetricsWAL()); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return new WALFactory(conf, listeners, serverName.toString()); }
public void addWAL(final WAL wal) { if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener.Base() { @Override public void logRollRequested(boolean lowReplicas) { walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { rollLog.set(true); rollLog.notifyAll(); } } }); } }
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, final byte[] row) throws IOException { final Admin admin = utility.getHBaseAdmin(); final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); // find the region that corresponds to the given row. HRegion region = null; for (HRegion candidate : cluster.getRegions(table)) { if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { region = candidate; break; } } assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); final CountDownLatch latch = new CountDownLatch(1); // listen for successful log rolls final WALActionsListener listener = new WALActionsListener.Base() { @Override public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { latch.countDown(); } }; region.getWAL().registerWALActionsListener(listener); // request a roll admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), region.getRegionInfo().getRegionName())); // wait try { latch.await(); } catch (InterruptedException exception) { LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + "replication tests fail, it's probably because we should still be waiting."); Thread.currentThread().interrupt(); } region.getWAL().unregisterWALActionsListener(listener); }
@Before public void setUp() throws Exception { logManager = new ReplicationWALReaderManager(fs, conf); List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); log = wals.getWAL(info.getEncodedNameAsBytes()); }
/** * Create an unmanaged WAL. Be sure to close it when you're through. */ public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri) throws IOException { // The WAL subsystem will use the default rootDir rather than the passed in rootDir // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); return (new WALFactory(confForWAL, Collections.<WALActionsListener>singletonList(new MetricsWAL()), "hregion-" + RandomStringUtils.randomNumeric(8))). getWAL(hri.getEncodedNameAsBytes()); }
static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); return new WALFactory(confForWAL, Collections.<WALActionsListener>singletonList(new MetricsWAL()), "hregion-" + RandomStringUtils.randomNumeric(8)); }
/** * Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance. Add any * {@link WALActionsListener}s you want inserted before WAL startup. * @return List of WALActionsListener that will be passed in to {@link HLog} on construction. */ protected List<WALActionsListener> getWALActionListeners() { List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); // Log roller. this.hlogRoller = new LogRoller(this, this); listeners.add(this.hlogRoller); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return listeners; }
protected List<WALActionsListener> getMetaWALActionListeners() { List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); // Using a tmp log roller to ensure metaLogRoller is alive once it is not // null (addendum patch on HBASE-7213) MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this); String n = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), n + "MetaLogRoller", uncaughtExceptionHandler); this.metaHLogRoller = tmpLogRoller; tmpLogRoller = null; listeners.add(this.metaHLogRoller); return listeners; }
/** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. * * @return A WAL instance. * @throws IOException */ private WALFactory setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); if (this.fs.exists(logdir)) { throw new RegionServerRunningException("Region server has already " + "created directory at " + this.serverName.toString()); } // Instantiate replication manager if replication enabled. Pass it the // log directories. createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); // listeners the wal factory will add to wals it creates. final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); listeners.add(new MetricsWAL()); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return new WALFactory(conf, listeners, serverName.toString()); }
/** * Convenience method creating new HRegions. Used by createTable. * The {@link WAL} for the created region needs to be closed * explicitly, if it is not null. * Use {@link HRegion#getWAL()} to get access. * * @param info Info for region to create. * @param rootDir Root directory for HBase instance * @param tableDir table directory * @param wal shared WAL * @param initialize - true to initialize the region * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable * @return new HRegion * @throws IOException */ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor, final WAL wal, final boolean initialize, final boolean ignoreWAL) throws IOException { LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); FileSystem fs = FileSystem.get(conf); HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info); WAL effectiveWAL = wal; if (wal == null && !ignoreWAL) { // TODO HBASE-11983 There'll be no roller for this wal? // The WAL subsystem will use the default rootDir rather than the passed in rootDir // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); effectiveWAL = (new WALFactory(confForWAL, Collections.<WALActionsListener>singletonList(new MetricsWAL()), "hregion-" + RandomStringUtils.randomNumeric(8))). getWAL(info.getEncodedNameAsBytes()); } HRegion region = HRegion.newHRegion(tableDir, effectiveWAL, fs, conf, info, hTableDescriptor, null); if (initialize) { // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when // verifying the WALEdits. region.setSequenceId(region.initialize(null)); } return region; }
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, final byte[] row) throws IOException { final Admin admin = utility.getHBaseAdmin(); final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); // find the region that corresponds to the given row. HRegion region = null; for (HRegion candidate : cluster.getRegions(table)) { if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { region = candidate; break; } } assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); final CountDownLatch latch = new CountDownLatch(1); // listen for successful log rolls final WALActionsListener listener = new WALActionsListener.Base() { @Override public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { latch.countDown(); } }; region.getWAL().registerWALActionsListener(listener); // request a roll admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), region.getRegionName())); // wait try { latch.await(); } catch (InterruptedException exception) { LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + "replication tests fail, it's probably because we should still be waiting."); Thread.currentThread().interrupt(); } region.getWAL().unregisterWALActionsListener(listener); }
@Before public void setUp() throws Exception { logManager = new ReplicationWALReaderManager(fs, conf); List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); final WALFactory wals = new WALFactory(conf, listeners, "some server"); log = wals.getWAL(info.getEncodedNameAsBytes()); }
/** * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance. * Add any {@link WALActionsListener}s you want inserted before WAL startup. * @return List of WALActionsListener that will be passed in to * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction. */ protected List<WALActionsListener> getWALActionListeners() { List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); // Log roller. this.hlogRoller = new LogRoller(this, this); listeners.add(this.hlogRoller); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return listeners; }
protected List<WALActionsListener> getMetaWALActionListeners() { List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); // Using a tmp log roller to ensure metaLogRoller is alive once it is not // null MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this); String n = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(tmpLogRoller.getThread(), n + "-MetaLogRoller", uncaughtExceptionHandler); this.metaHLogRoller = tmpLogRoller; tmpLogRoller = null; listeners.add(this.metaHLogRoller); return listeners; }
@Before public void setUp() throws Exception { logManager = new ReplicationHLogReaderManager(fs, conf); List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server"); }
/** * Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance. * Add any {@link WALActionsListener}s you want inserted before WAL startup. * @return List of WALActionsListener that will be passed in to * {@link HLog} on construction. */ protected List<WALActionsListener> getWALActionListeners() { List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); // Log roller. this.hlogRoller = new LogRoller(this, this); listeners.add(this.hlogRoller); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return listeners; }