public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) { super(server, EventType.RS_LOG_REPLAY); this.splitTaskDetails = splitDetails; this.coordination = coordination; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); this.splitTaskExecutor = splitTaskExecutor; this.mode = mode; }
@VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.walFactory = factory; PipelineController controller = new PipelineController(); this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); entryBuffers = new EntryBuffers(controller, this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), splitWriterCreationBounded); int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if(splitWriterCreationBounded){ outputSink = new BoundedLogWriterCreationOutputSink( controller, entryBuffers, numWriterThreads); }else { outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } }
/** * Splits a WAL file into region's recovered-edits directory. * This is the main entry point for distributed log splitting from SplitLogWorker. * <p> * If the log file has N regions then N recovered.edits files will be produced. * <p> * @return false if it is interrupted by the progress-able. */ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); }
public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { super(server, EventType.RS_LOG_REPLAY); this.splitTaskDetails = splitDetails; this.coordination = coordination; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); this.splitTaskExecutor = splitTaskExecutor; }
private void startServiceThreads() throws IOException { // Start executor services this.service.startExecutorService(ExecutorType.RS_OPEN_REGION, conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_OPEN_META, conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, conf.getInt("hbase.regionserver.region.replica.flusher.threads", conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker", uncaughtExceptionHandler); if (this.replicationSourceHandler == this.replicationSinkHandler && this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } if (this.replicationSinkHandler != null) { this.replicationSinkHandler.startReplicationService(); } } // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for // quite a while inside HConnection layer. The worker won't be available for other // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); splitLogWorker.start(); }
private void startServiceThreads() throws IOException { // Start executor services this.service.startExecutorService(ExecutorType.RS_OPEN_REGION, conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_OPEN_META, conf.getInt("hbase.regionserver.executor.openmeta.threads", 1)); this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION, conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() + ".compactionChecker", uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() + ".periodicFlusher", uncaughtExceptionHandler); if (this.healthCheckChore != null) { Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker", uncaughtExceptionHandler); } if (this.nonceManagerChore != null) { Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner", uncaughtExceptionHandler); } if (this.storefileRefresher != null) { Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher", uncaughtExceptionHandler); } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. this.leases.setName(getName() + ".leaseChecker"); this.leases.start(); if (this.replicationSourceHandler == this.replicationSinkHandler && this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } if (this.replicationSinkHandler != null) { this.replicationSinkHandler.startReplicationService(); } } // Create the log splitting worker and start it // set a smaller retries to fast fail otherwise splitlogworker could be blocked for // quite a while inside HConnection layer. The worker won't be available for other // tasks even after current task is preempted after a split task times out. Configuration sinkConf = HBaseConfiguration.create(conf); sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory); splitLogWorker.start(); }
/** * Method to retrieve coordination for split log worker */ SplitLogWorkerCoordination getSplitLogWorkerCoordination();