@Before public void setup() throws Exception { TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); ds = new DummyServer(zkw, conf); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1))); LOG.debug(zkw.baseZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1))); LOG.debug(zkw.splitLogZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1))); SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); }
/** * Constructs a new assignment manager. * * @param master * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, serverManager, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000)); this.timerUpdater = new TimerUpdater(conf.getInt( "hbase.master.assignment.timerupdater.period", 10000), master); Threads.setDaemonThreadRunning(timerUpdater.getThread(), master.getServerName() + ".timerUpdater"); this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = balancer; this.threadPoolExecutorService = Executors.newCachedThreadPool(); }
/** * Modify table is async so wait on completion of the table operation in master. * @param tableName * @param htd * @throws IOException */ private void modifyTable(final byte [] tableName, final HTableDescriptor htd) throws IOException { MasterServices services = TEST_UTIL.getMiniHBaseCluster().getMaster(); ExecutorService executor = services.getExecutorService(); AtomicBoolean done = new AtomicBoolean(false); executor.registerListener(EventType.C_M_MODIFY_TABLE, new DoneListener(done)); this.admin.modifyTable(tableName, htd); while (!done.get()) { synchronized (done) { try { done.wait(100); } catch (InterruptedException e) { e.printStackTrace(); } } } executor.unregisterListener(EventType.C_M_MODIFY_TABLE); }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test public void testShutdownHandler() throws KeeperException, IOException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // We need a mocked catalog tracker. CatalogTracker ct = Mockito.mock(CatalogTracker.class); LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server .getConfiguration()); // Create an AM. AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer, executor); try { processServerShutdownHandler(ct, am, false, null); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test (timeout=180000) public void testShutdownHandler() throws KeeperException, IOException, CoordinatedStateException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // Create an AM. AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); try { processServerShutdownHandler(am, false); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test public void testShutdownHandler() throws KeeperException, IOException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // We need a mocked catalog tracker. CatalogTracker ct = Mockito.mock(CatalogTracker.class); // Create an AM. AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); try { processServerShutdownHandler(ct, am, false); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
@Before public void setup() throws Exception { TEST_UTIL.startMiniZKCluster(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); LOG.debug(zkw.baseZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); LOG.debug(zkw.splitLogZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1); SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); }
@Before public void setup() throws Exception { TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); ds = new DummyServer(zkw, conf); ZKUtil.deleteChildrenRecursively(zkw, zkw.znodePaths.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.baseZNode); assertThat(ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode), not (is(-1))); LOG.debug(zkw.znodePaths.baseZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.splitLogZNode); assertThat(ZKUtil.checkExists(zkw, zkw.znodePaths.splitLogZNode), not (is(-1))); LOG.debug(zkw.znodePaths.splitLogZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.rsZNode); assertThat(ZKUtil.checkExists(zkw, zkw.znodePaths.rsZNode), not (is(-1))); SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); }
/** * Constructs a new assignment manager. * * @param master * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, CatalogTracker catalogTracker, final ExecutorService service) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, serverManager, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000)); Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName() + ".timeoutMonitor"); this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.threadPoolExecutorService = Executors.newCachedThreadPool(); }
/** * Run a simple server shutdown handler. * @throws KeeperException * @throws IOException */ @Test public void testShutdownHandler() throws KeeperException, IOException, CoordinatedStateException, ServiceException { // Create and startup an executor. This is used by AssignmentManager // handling zk callbacks. ExecutorService executor = startupMasterExecutor("testShutdownHandler"); // We need a mocked catalog tracker. CatalogTracker ct = Mockito.mock(CatalogTracker.class); // Create an AM. AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager( this.server, this.serverManager); try { processServerShutdownHandler(ct, am, false); } finally { executor.shutdown(); am.shutdown(); // Clean up all znodes ZKAssign.deleteAllNodes(this.watcher); } }
/** * Fully specify all necessary components of a snapshot manager. Exposed for testing. * @param master services for the master where the manager is running * @param coordinator procedure coordinator instance. exposed for testing. * @param pool HBase ExecutorServcie instance, exposed for testing. */ public SnapshotManager(final MasterServices master, final MasterMetrics metricsMaster, ProcedureCoordinator coordinator, ExecutorService pool) throws IOException, UnsupportedOperationException { this.master = master; this.metricsMaster = metricsMaster; this.rootDir = master.getMasterFileSystem().getRootDir(); checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); this.coordinator = coordinator; this.executorService = pool; resetTempDir(); }
/** * Modify table is async so wait on completion of the table operation in master. * @param tableName * @param htd * @throws IOException */ private void modifyTable(final byte [] tableName, final HTableDescriptor htd) throws IOException { MasterServices services = TEST_UTIL.getMiniHBaseCluster().getMaster(); ExecutorService executor = services.getExecutorService(); AtomicBoolean done = new AtomicBoolean(false); executor.registerListener(EventType.C_M_MODIFY_TABLE, new DoneListener(done)); this.admin.modifyTable(tableName, htd); while (!done.get()) { //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (done) { try { done.wait(100); } catch (InterruptedException e) { e.printStackTrace(); } } } executor.unregisterListener(EventType.C_M_MODIFY_TABLE); }
protected void dumpExecutors(ExecutorService service, PrintWriter out) throws IOException { if (service == null) { out.println("ExecutorService is not initialized"); return; } Map<String, ExecutorStatus> statuses = service.getAllExecutorStatuses(); for (ExecutorStatus status : statuses.values()) { status.dumpTo(out, " "); } }
/** * Fully specify all necessary components of a snapshot manager. Exposed for testing. * @param master services for the master where the manager is running * @param coordinator procedure coordinator instance. exposed for testing. * @param pool HBase ExecutorServcie instance, exposed for testing. */ public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster, ProcedureCoordinator coordinator, ExecutorService pool) throws IOException, UnsupportedOperationException { this.master = master; this.rootDir = master.getMasterFileSystem().getRootDir(); checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); this.coordinator = coordinator; this.executorService = pool; resetTempDir(); }
private ExecutorService startupMasterExecutor(final String name) { ExecutorService executor = new ExecutorService(name); executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3); executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3); executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3); executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3); return executor; }
protected void dumpExecutors(ExecutorService service, PrintWriter out) throws IOException { Map<String, ExecutorStatus> statuses = service.getAllExecutorStatuses(); for (ExecutorStatus status : statuses.values()) { status.dumpTo(out, " "); } }
/** * Fully specify all necessary components of a snapshot manager. Exposed for testing. * @param master services for the master where the manager is running * @param coordinator procedure coordinator instance. exposed for testing. * @param pool HBase ExecutorServcie instance, exposed for testing. */ public SnapshotManager(final MasterServices master, final MasterMetrics metricsMaster, ProcedureCoordinator coordinator, ExecutorService pool) throws IOException, UnsupportedOperationException { this.master = master; this.metricsMaster = metricsMaster; this.rootDir = master.getMasterFileSystem().getRootDir(); checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); this.coordinator = coordinator; this.executorService = pool; resetTempDir(); }