void processFavoredNodes(List<HRegionInfo> regions) throws IOException { if (!shouldAssignRegionsWithFavoredNodes) return; // The AM gets the favored nodes info for each region and updates the meta // table with that info Map<HRegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<HRegionInfo, List<ServerName>>(); for (HRegionInfo region : regions) { regionToFavoredNodes.put(region, ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region)); } FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, this.server.getConnection()); }
@BeforeClass public static void setupBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Enable the favored nodes based load balancer conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, FavoredNodeLoadBalancer.class, LoadBalancer.class); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); TEST_UTIL.startMiniCluster(SLAVES); }
@BeforeClass public static void setupBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Enable the favored nodes based load balancer conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, FavoredNodeLoadBalancer.class, LoadBalancer.class); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); TEST_UTIL.startMiniCluster(SLAVES); CONNECTION = TEST_UTIL.getConnection(); admin = CONNECTION.getAdmin(); rp = new RegionPlacementMaintainer(conf); }
void processFavoredNodes(List<HRegionInfo> regions) throws IOException { if (!shouldAssignRegionsWithFavoredNodes) return; // The AM gets the favored nodes info for each region and updates the meta // table with that info Map<HRegionInfo, List<ServerName>> regionToFavoredNodes = new HashMap<HRegionInfo, List<ServerName>>(); for (HRegionInfo region : regions) { regionToFavoredNodes.put(region, ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region)); } FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker); }
@BeforeClass public static void setupBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Enable the favored nodes based load balancer conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, FavoredNodeLoadBalancer.class, LoadBalancer.class); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); TEST_UTIL.startMiniCluster(SLAVES); admin = new HBaseAdmin(conf); rp = new RegionPlacementMaintainer(conf); }
/** * Constructs a new assignment manager. * * @param server instance of HMaster this AM running inside * @param serverManager serverManager for associated HMaster * @param balancer implementation of {@link LoadBalancer} * @param service Executor service * @param metricsMaster metrics manager * @param tableLockManager TableLock manager * @throws KeeperException * @throws IOException */ public AssignmentManager(MasterServices server, ServerManager serverManager, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager) throws KeeperException, IOException, CoordinatedStateException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.executorService = service; this.regionStateStore = new RegionStateStore(server); this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); // Only read favored nodes if using the favored nodes load balancer. this.shouldAssignRegionsWithFavoredNodes = conf.getClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); try { if (server.getCoordinatedStateManager() != null) { this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager(); } else { this.tableStateManager = null; } } catch (InterruptedException e) { throw new InterruptedIOException(); } // This is the max attempts, not retries, so it should be at least 1. this.maximumAttempts = Math.max(1, this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); this.regionStates = new RegionStates( server, tableStateManager, serverManager, regionStateStore); this.bulkAssignWaitTillAllAssigned = conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); this.bulkPerRegionOpenTimeGuesstimate = conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); this.tableLockManager = tableLockManager; this.metricsAssignmentManager = new MetricsAssignmentManager(); useZKForAssignment = ConfigUtil.useZKForAssignment(conf); }
/** * At master failover, for pending_open region, make sure * sendRegionOpen RPC call is sent to the target regionserver */ private void retrySendRegionOpen(final RegionState regionState) { this.executorService.submit( new EventHandler(server, EventType.M_MASTER_RECOVERY) { @Override public void process() throws IOException { HRegionInfo hri = regionState.getRegion(); ServerName serverName = regionState.getServerName(); ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); try { for (int i = 1; i <= maximumAttempts; i++) { if (!serverManager.isServerOnline(serverName) || server.isStopped() || server.isAborted()) { return; // No need any more } try { if (!regionState.equals(regionStates.getRegionState(hri))) { return; // Region is not in the expected state any more } List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; if (shouldAssignRegionsWithFavoredNodes) { favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri); } RegionOpeningState regionOpenState = serverManager.sendRegionOpen( serverName, hri, -1, favoredNodes); if (regionOpenState == RegionOpeningState.FAILED_OPENING) { // Failed opening this region, this means the target server didn't get // the original region open RPC, so re-assign it with a new plan LOG.debug("Got failed_opening in retry sendRegionOpen for " + regionState + ", re-assign it"); invokeAssign(hri, true); } return; // Done. } catch (Throwable t) { if (t instanceof RemoteException) { t = ((RemoteException) t).unwrapRemoteException(); } // In case SocketTimeoutException/FailedServerException, retry if (t instanceof java.net.SocketTimeoutException || t instanceof FailedServerException) { Threads.sleep(100); continue; } // For other exceptions, re-assign it LOG.debug("Got exception in retry sendRegionOpen for " + regionState + ", re-assign it", t); invokeAssign(hri); return; // Done. } } } finally { lock.unlock(); } } }); }
@Test public void testFavoredNodesPresentForRoundRobinAssignment() throws HBaseIOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); List<ServerName> servers = new ArrayList<ServerName>(); for (int i = 0; i < SLAVES; i++) { ServerName server = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i).getServerName(); servers.add(server); } List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1); HRegionInfo region = new HRegionInfo(TableName.valueOf("foobar")); regions.add(region); Map<ServerName,List<HRegionInfo>> assignmentMap = balancer.roundRobinAssignment(regions, servers); Set<ServerName> serverBefore = assignmentMap.keySet(); List<ServerName> favoredNodesBefore = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesBefore.size() == 3); // the primary RS should be the one that the balancer's assignment returns assertTrue(ServerName.isSameHostnameAndPort(serverBefore.iterator().next(), favoredNodesBefore.get(PRIMARY))); // now remove the primary from the list of available servers List<ServerName> removedServers = removeMatchingServers(serverBefore, servers); // call roundRobinAssignment with the modified servers list assignmentMap = balancer.roundRobinAssignment(regions, servers); List<ServerName> favoredNodesAfter = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesAfter.size() == 3); // We don't expect the favored nodes assignments to change in multiple calls // to the roundRobinAssignment method in the balancer (relevant for AssignmentManager.assign // failures) assertTrue(favoredNodesAfter.containsAll(favoredNodesBefore)); Set<ServerName> serverAfter = assignmentMap.keySet(); // We expect the new RegionServer assignee to be one of the favored nodes // chosen earlier. assertTrue(ServerName.isSameHostnameAndPort(serverAfter.iterator().next(), favoredNodesBefore.get(SECONDARY)) || ServerName.isSameHostnameAndPort(serverAfter.iterator().next(), favoredNodesBefore.get(TERTIARY))); // put back the primary in the list of available servers servers.addAll(removedServers); // now roundRobinAssignment with the modified servers list should return the primary // as the regionserver assignee assignmentMap = balancer.roundRobinAssignment(regions, servers); Set<ServerName> serverWithPrimary = assignmentMap.keySet(); assertTrue(serverBefore.containsAll(serverWithPrimary)); // Make all the favored nodes unavailable for assignment removeMatchingServers(favoredNodesAfter, servers); // call roundRobinAssignment with the modified servers list assignmentMap = balancer.roundRobinAssignment(regions, servers); List<ServerName> favoredNodesNow = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesNow.size() == 3); assertTrue(!favoredNodesNow.contains(favoredNodesAfter.get(PRIMARY)) && !favoredNodesNow.contains(favoredNodesAfter.get(SECONDARY)) && !favoredNodesNow.contains(favoredNodesAfter.get(TERTIARY))); }
@Test public void testFavoredNodesPresentForRandomAssignment() throws HBaseIOException { LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(TEST_UTIL.getConfiguration()); balancer.setMasterServices(TEST_UTIL.getMiniHBaseCluster().getMaster()); List<ServerName> servers = new ArrayList<ServerName>(); for (int i = 0; i < SLAVES; i++) { ServerName server = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i).getServerName(); servers.add(server); } List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1); HRegionInfo region = new HRegionInfo(TableName.valueOf("foobar")); regions.add(region); ServerName serverBefore = balancer.randomAssignment(region, servers); List<ServerName> favoredNodesBefore = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesBefore.size() == 3); // the primary RS should be the one that the balancer's assignment returns assertTrue(ServerName.isSameHostnameAndPort(serverBefore,favoredNodesBefore.get(PRIMARY))); // now remove the primary from the list of servers removeMatchingServers(serverBefore, servers); // call randomAssignment with the modified servers list ServerName serverAfter = balancer.randomAssignment(region, servers); List<ServerName> favoredNodesAfter = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesAfter.size() == 3); // We don't expect the favored nodes assignments to change in multiple calls // to the randomAssignment method in the balancer (relevant for AssignmentManager.assign // failures) assertTrue(favoredNodesAfter.containsAll(favoredNodesBefore)); // We expect the new RegionServer assignee to be one of the favored nodes // chosen earlier. assertTrue(ServerName.isSameHostnameAndPort(serverAfter, favoredNodesBefore.get(SECONDARY)) || ServerName.isSameHostnameAndPort(serverAfter, favoredNodesBefore.get(TERTIARY))); // Make all the favored nodes unavailable for assignment removeMatchingServers(favoredNodesAfter, servers); // call randomAssignment with the modified servers list balancer.randomAssignment(region, servers); List<ServerName> favoredNodesNow = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(region); assertTrue(favoredNodesNow.size() == 3); assertTrue(!favoredNodesNow.contains(favoredNodesAfter.get(PRIMARY)) && !favoredNodesNow.contains(favoredNodesAfter.get(SECONDARY)) && !favoredNodesNow.contains(favoredNodesAfter.get(TERTIARY))); }
/** * Constructs a new assignment manager. * * @param server instance of HMaster this AM running inside * @param serverManager serverManager for associated HMaster * @param balancer implementation of {@link LoadBalancer} * @param service Executor service * @param metricsMaster metrics manager * @param tableLockManager TableLock manager * @throws KeeperException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager) throws KeeperException, IOException, CoordinatedStateException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.executorService = service; this.regionStateStore = new RegionStateStore(server); this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); // Only read favored nodes if using the favored nodes load balancer. this.shouldAssignRegionsWithFavoredNodes = conf.getClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); try { if (server.getCoordinatedStateManager() != null) { this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager(); } else { this.tableStateManager = null; } } catch (InterruptedException e) { throw new InterruptedIOException(); } // This is the max attempts, not retries, so it should be at least 1. this.maximumAttempts = Math.max(1, this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); this.regionStates = new RegionStates( server, tableStateManager, serverManager, regionStateStore); this.bulkAssignWaitTillAllAssigned = conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); this.tableLockManager = tableLockManager; this.metricsAssignmentManager = new MetricsAssignmentManager(); useZKForAssignment = ConfigUtil.useZKForAssignment(conf); }
/** * Constructs a new assignment manager. * * @param server * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); // Only read favored nodes if using the favored nodes load balancer. this.shouldAssignRegionsWithFavoredNodes = conf.getClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); this.tomActivated = conf.getBoolean( ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT); if (tomActivated){ this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), server, serverManager, conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT)); this.timerUpdater = new TimerUpdater(conf.getInt( "hbase.master.assignment.timerupdater.period", 10000), server); Threads.setDaemonThreadRunning(timerUpdater.getThread(), server.getServerName() + ".timerUpdater"); } else { this.serversInUpdatingTimer = null; this.timeoutMonitor = null; this.timerUpdater = null; } this.zkTable = new ZKTable(this.watcher); // This is the max attempts, not retries, so it should be at least 1. this.maximumAttempts = Math.max(1, this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); this.regionStates = new RegionStates(server, serverManager); this.bulkAssignWaitTillAllAssigned = conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); this.tableLockManager = tableLockManager; this.metricsAssignmentManager = new MetricsAssignmentManager(); }
/** * Constructs a new assignment manager. * * @param server * @param serverManager * @param catalogTracker * @param service * @throws KeeperException * @throws IOException */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager) throws KeeperException, IOException, CoordinatedStateException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; this.regionsToReopen = Collections.synchronizedMap (new HashMap<String, HRegionInfo> ()); Configuration conf = server.getConfiguration(); // Only read favored nodes if using the favored nodes load balancer. this.shouldAssignRegionsWithFavoredNodes = conf.getClass( HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( FavoredNodeLoadBalancer.class); try { if (server.getCoordinatedStateManager() != null) { this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager(); } else { this.tableStateManager = null; } } catch (InterruptedException e) { throw new InterruptedIOException(); } // This is the max attempts, not retries, so it should be at least 1. this.maximumAttempts = Math.max(1, this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( "hbase.meta.assignment.retry.sleeptime", 1000l); this.balancer = balancer; int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); this.regionStates = new RegionStates(server, serverManager); this.bulkAssignWaitTillAllAssigned = conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); this.tableLockManager = tableLockManager; this.metricsAssignmentManager = new MetricsAssignmentManager(); }