Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D1962200CB8 for ; Sat, 27 May 2017 02:12:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D03E2160BE5; Sat, 27 May 2017 00:12:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 56DC8160BE3 for ; Sat, 27 May 2017 02:12:38 +0200 (CEST) Received: (qmail 43505 invoked by uid 500); 27 May 2017 00:12:36 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 40761 invoked by uid 99); 27 May 2017 00:12:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 May 2017 00:12:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D7A12F21B0; Sat, 27 May 2017 00:12:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Sat, 27 May 2017 00:12:53 -0000 Message-Id: In-Reply-To: <303f0926e111442eabb942b126f0b353@git.apache.org> References: <303f0926e111442eabb942b126f0b353@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/30] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility. archived-at: Sat, 27 May 2017 00:12:41 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java deleted file mode 100644 index 69ebd97..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ /dev/null @@ -1,3053 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CoordinatedStateException; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.RegionStateListener; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MasterSwitchType; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.favored.FavoredNodesManager; -import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; -import org.apache.hadoop.hbase.ipc.FailedServerException; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.master.RegionState.State; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.quotas.QuotaExceededException; -import org.apache.hadoop.hbase.regionserver.RegionOpeningState; -import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.KeyLocker; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.KeeperException; - -/** - * Manages and performs region assignment. - * Related communications with regionserver are all done over RPC. - */ -@InterfaceAudience.Private -public class AssignmentManager { - private static final Log LOG = LogFactory.getLog(AssignmentManager.class); - - protected final MasterServices server; - - private ServerManager serverManager; - - private boolean shouldAssignRegionsWithFavoredNodes; - - private LoadBalancer balancer; - - private final MetricsAssignmentManager metricsAssignmentManager; - - private AtomicInteger numRegionsOpened = new AtomicInteger(0); - - final private KeyLocker locker = new KeyLocker<>(); - - Set replicasToClose = Collections.synchronizedSet(new HashSet()); - - /** - * Map of regions to reopen after the schema of a table is changed. Key - - * encoded region name, value - HRegionInfo - */ - private final Map regionsToReopen; - - /* - * Maximum times we recurse an assignment/unassignment. - * See below in {@link #assign()} and {@link #unassign()}. - */ - private final int maximumAttempts; - - /** - * The sleep time for which the assignment will wait before retrying in case of - * hbase:meta assignment failure due to lack of availability of region plan or bad region plan - */ - private final long sleepTimeBeforeRetryingMetaAssignment; - - /** Plans for region movement. Key is the encoded version of a region name*/ - // TODO: When do plans get cleaned out? Ever? In server open and in server - // shutdown processing -- St.Ack - // All access to this Map must be synchronized. - final NavigableMap regionPlans = new TreeMap<>(); - - private final TableStateManager tableStateManager; - - private final ExecutorService executorService; - - private java.util.concurrent.ExecutorService threadPoolExecutorService; - private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; - - private final RegionStates regionStates; - - // The threshold to use bulk assigning. Using bulk assignment - // only if assigning at least this many regions to at least this - // many servers. If assigning fewer regions to fewer servers, - // bulk assigning may be not as efficient. - private final int bulkAssignThresholdRegions; - private final int bulkAssignThresholdServers; - private final int bulkPerRegionOpenTimeGuesstimate; - - // Should bulk assignment wait till all regions are assigned, - // or it is timed out? This is useful to measure bulk assignment - // performance, but not needed in most use cases. - private final boolean bulkAssignWaitTillAllAssigned; - - /** - * Indicator that AssignmentManager has recovered the region states so - * that ServerShutdownHandler can be fully enabled and re-assign regions - * of dead servers. So that when re-assignment happens, AssignmentManager - * has proper region states. - * - * Protected to ease testing. - */ - protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); - - /** - * A map to track the count a region fails to open in a row. - * So that we don't try to open a region forever if the failure is - * unrecoverable. We don't put this information in region states - * because we don't expect this to happen frequently; we don't - * want to copy this information over during each state transition either. - */ - private final ConcurrentHashMap failedOpenTracker = new ConcurrentHashMap<>(); - - // In case not using ZK for region assignment, region states - // are persisted in meta with a state store - private final RegionStateStore regionStateStore; - - /** - * For testing only! Set to true to skip handling of split. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") - public static boolean TEST_SKIP_SPLIT_HANDLING = false; - - /** Listeners that are called on assignment events. */ - private List listeners = new CopyOnWriteArrayList<>(); - - private RegionStateListener regionStateListener; - - private RetryCounter.BackoffPolicy backoffPolicy; - private RetryCounter.RetryConfig retryConfig; - /** - * Constructs a new assignment manager. - * - * @param server instance of HMaster this AM running inside - * @param serverManager serverManager for associated HMaster - * @param balancer implementation of {@link LoadBalancer} - * @param service Executor service - * @param metricsMaster metrics manager - * @throws IOException - */ - public AssignmentManager(MasterServices server, ServerManager serverManager, - final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, - final TableStateManager tableStateManager) - throws IOException { - this.server = server; - this.serverManager = serverManager; - this.executorService = service; - this.regionStateStore = new RegionStateStore(server); - this.regionsToReopen = Collections.synchronizedMap - (new HashMap ()); - Configuration conf = server.getConfiguration(); - - this.tableStateManager = tableStateManager; - - // This is the max attempts, not retries, so it should be at least 1. - this.maximumAttempts = Math.max(1, - this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); - this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( - "hbase.meta.assignment.retry.sleeptime", 1000l); - this.balancer = balancer; - // Only read favored nodes if using the favored nodes load balancer. - this.shouldAssignRegionsWithFavoredNodes = this.balancer instanceof FavoredNodesPromoter; - int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); - - this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( - maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); - - this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, - Threads.newDaemonThreadFactory("AM.Scheduler")); - - this.regionStates = new RegionStates( - server, tableStateManager, serverManager, regionStateStore); - - this.bulkAssignWaitTillAllAssigned = - conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); - this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); - this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); - this.bulkPerRegionOpenTimeGuesstimate = - conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); - - this.metricsAssignmentManager = new MetricsAssignmentManager(); - - // Configurations for retrying opening a region on receiving a FAILED_OPEN - this.retryConfig = new RetryCounter.RetryConfig(); - this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l)); - // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy - // if the user does not set a max sleep limit - this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max", - retryConfig.getSleepInterval())); - this.backoffPolicy = getBackoffPolicy(); - } - - /** - * Returns the backoff policy used for Failed Region Open retries - * @return the backoff policy used for Failed Region Open retries - */ - RetryCounter.BackoffPolicy getBackoffPolicy() { - return new RetryCounter.ExponentialBackoffPolicyWithLimit(); - } - - MetricsAssignmentManager getAssignmentManagerMetrics() { - return this.metricsAssignmentManager; - } - - /** - * Add the listener to the notification list. - * @param listener The AssignmentListener to register - */ - public void registerListener(final AssignmentListener listener) { - this.listeners.add(listener); - } - - /** - * Remove the listener from the notification list. - * @param listener The AssignmentListener to unregister - */ - public boolean unregisterListener(final AssignmentListener listener) { - return this.listeners.remove(listener); - } - - /** - * @return Instance of ZKTableStateManager. - */ - public TableStateManager getTableStateManager() { - // These are 'expensive' to make involving trip to zk ensemble so allow - // sharing. - return this.tableStateManager; - } - - /** - * This SHOULD not be public. It is public now - * because of some unit tests. - * - * TODO: make it package private and keep RegionStates in the master package - */ - public RegionStates getRegionStates() { - return regionStates; - } - - /** - * Used in some tests to mock up region state in meta - */ - @VisibleForTesting - RegionStateStore getRegionStateStore() { - return regionStateStore; - } - - public RegionPlan getRegionReopenPlan(HRegionInfo hri) { - return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri)); - } - - /** - * Add a regionPlan for the specified region. - * @param encodedName - * @param plan - */ - public void addPlan(String encodedName, RegionPlan plan) { - synchronized (regionPlans) { - regionPlans.put(encodedName, plan); - } - } - - /** - * Add a map of region plans. - */ - public void addPlans(Map plans) { - synchronized (regionPlans) { - regionPlans.putAll(plans); - } - } - - /** - * Set the list of regions that will be reopened - * because of an update in table schema - * - * @param regions - * list of regions that should be tracked for reopen - */ - public void setRegionsToReopen(List regions) { - for(HRegionInfo hri : regions) { - regionsToReopen.put(hri.getEncodedName(), hri); - } - } - - /** - * Used by the client to identify if all regions have the schema updates - * - * @param tableName - * @return Pair indicating the status of the alter command - * @throws IOException - */ - public Pair getReopenStatus(TableName tableName) - throws IOException { - List hris; - if (TableName.META_TABLE_NAME.equals(tableName)) { - hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper()); - } else { - hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true); - } - - Integer pending = 0; - for (HRegionInfo hri : hris) { - String name = hri.getEncodedName(); - // no lock concurrent access ok: sequential consistency respected. - if (regionsToReopen.containsKey(name) - || regionStates.isRegionInTransition(name)) { - pending++; - } - } - return new Pair<>(pending, hris.size()); - } - - /** - * Used by ServerShutdownHandler to make sure AssignmentManager has completed - * the failover cleanup before re-assigning regions of dead servers. So that - * when re-assignment happens, AssignmentManager has proper region states. - */ - public boolean isFailoverCleanupDone() { - return failoverCleanupDone.get(); - } - - /** - * To avoid racing with AM, external entities may need to lock a region, - * for example, when SSH checks what regions to skip re-assigning. - */ - public Lock acquireRegionLock(final String encodedName) { - return locker.acquireLock(encodedName); - } - - /** - * Now, failover cleanup is completed. Notify server manager to - * process queued up dead servers processing, if any. - */ - void failoverCleanupDone() { - failoverCleanupDone.set(true); - serverManager.processQueuedDeadServers(); - } - - /** - * Called on startup. - * Figures whether a fresh cluster start of we are joining extant running cluster. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - * @throws CoordinatedStateException - */ - void joinCluster() - throws IOException, KeeperException, InterruptedException, CoordinatedStateException { - long startTime = System.currentTimeMillis(); - // Concurrency note: In the below the accesses on regionsInTransition are - // outside of a synchronization block where usually all accesses to RIT are - // synchronized. The presumption is that in this case it is safe since this - // method is being played by a single thread on startup. - - // TODO: Regions that have a null location and are not in regionsInTransitions - // need to be handled. - - // Scan hbase:meta to build list of existing regions, servers, and assignment - // Returns servers who have not checked in (assumed dead) that some regions - // were assigned to (according to the meta) - Set deadServers = rebuildUserRegions(); - - // This method will assign all user regions if a clean server startup or - // it will reconstruct master state and cleanup any leftovers from previous master process. - boolean failover = processDeadServersAndRegionsInTransition(deadServers); - - LOG.info("Joined the cluster in " + (System.currentTimeMillis() - - startTime) + "ms, failover=" + failover); - } - - /** - * Process all regions that are in transition in zookeeper and also - * processes the list of dead servers. - * Used by master joining an cluster. If we figure this is a clean cluster - * startup, will assign all user regions. - * @param deadServers Set of servers that are offline probably legitimately that were carrying - * regions according to a scan of hbase:meta. Can be null. - * @throws IOException - * @throws InterruptedException - */ - boolean processDeadServersAndRegionsInTransition(final Set deadServers) - throws KeeperException, IOException, InterruptedException, CoordinatedStateException { - // TODO Needed? List nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); - boolean failover = !serverManager.getDeadServers().isEmpty(); - if (failover) { - // This may not be a failover actually, especially if meta is on this master. - if (LOG.isDebugEnabled()) { - LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers()); - } - // Check if there are any regions on these servers - failover = false; - for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) { - if (regionStates.getRegionAssignments().values().contains(serverName)) { - LOG.debug("Found regions on dead server: " + serverName); - failover = true; - break; - } - } - } - Set onlineServers = serverManager.getOnlineServers().keySet(); - if (!failover) { - // If any one region except meta is assigned, it's a failover. - for (Map.Entry en: - regionStates.getRegionAssignments().entrySet()) { - HRegionInfo hri = en.getKey(); - if (!hri.isMetaTable() - && onlineServers.contains(en.getValue())) { - LOG.debug("Found region " + hri + " out on cluster"); - failover = true; - break; - } - } - } - if (!failover) { - // If any region except meta is in transition on a live server, it's a failover. - Set regionsInTransition = regionStates.getRegionsInTransition(); - if (!regionsInTransition.isEmpty()) { - for (RegionState regionState: regionsInTransition) { - ServerName serverName = regionState.getServerName(); - if (!regionState.getRegion().isMetaRegion() - && serverName != null && onlineServers.contains(serverName)) { - LOG.debug("Found " + regionState + " for region " + - regionState.getRegion().getRegionNameAsString() + " for server " + - serverName + "in RITs"); - failover = true; - break; - } - } - } - } - if (!failover) { - // If we get here, we have a full cluster restart. It is a failover only - // if there are some WALs are not split yet. For meta WALs, they should have - // been split already, if any. We can walk through those queued dead servers, - // if they don't have any WALs, this restart should be considered as a clean one - Set queuedDeadServers = serverManager.getRequeuedDeadServers().keySet(); - if (!queuedDeadServers.isEmpty()) { - Configuration conf = server.getConfiguration(); - Path walRootDir = FSUtils.getWALRootDir(conf); - FileSystem walFs = FSUtils.getWALFileSystem(conf); - for (ServerName serverName: queuedDeadServers) { - // In the case of a clean exit, the shutdown handler would have presplit any WALs and - // removed empty directories. - Path walDir = new Path(walRootDir, - AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); - Path splitDir = walDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); - if (checkWals(walFs, walDir) || checkWals(walFs, splitDir)) { - LOG.debug("Found queued dead server " + serverName); - failover = true; - break; - } - } - if (!failover) { - // We figured that it's not a failover, so no need to - // work on these re-queued dead servers any more. - LOG.info("AM figured that it's not a failover and cleaned up " - + queuedDeadServers.size() + " queued dead servers"); - serverManager.removeRequeuedDeadServers(); - } - } - } - - Set disabledOrDisablingOrEnabling = null; - Map allRegions = null; - - if (!failover) { - disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( - TableState.State.DISABLED, TableState.State.DISABLING, - TableState.State.ENABLING); - - // Clean re/start, mark all user regions closed before reassignment - allRegions = regionStates.closeAllUserRegions( - disabledOrDisablingOrEnabling); - } - - // Now region states are restored - regionStateStore.start(); - - if (failover) { - if (deadServers != null && !deadServers.isEmpty()) { - for (ServerName serverName: deadServers) { - if (!serverManager.isServerDead(serverName)) { - serverManager.expireServer(serverName); // Let SSH do region re-assign - } - } - } - processRegionsInTransition(regionStates.getRegionsInTransition()); - } - - // Now we can safely claim failover cleanup completed and enable - // ServerShutdownHandler for further processing. The nodes (below) - // in transition, if any, are for regions not related to those - // dead servers at all, and can be done in parallel to SSH. - failoverCleanupDone(); - if (!failover) { - // Fresh cluster startup. - LOG.info("Clean cluster startup. Don't reassign user regions"); - assignAllUserRegions(allRegions); - } else { - LOG.info("Failover! Reassign user regions"); - } - // unassign replicas of the split parents and the merged regions - // the daughter replicas are opened in assignAllUserRegions if it was - // not already opened. - for (HRegionInfo h : replicasToClose) { - unassign(h); - } - replicasToClose.clear(); - return failover; - } - - private boolean checkWals(FileSystem fs, Path dir) throws IOException { - if (!fs.exists(dir)) { - LOG.debug(dir + " doesn't exist"); - return false; - } - if (!fs.getFileStatus(dir).isDirectory()) { - LOG.warn(dir + " is not a directory"); - return false; - } - FileStatus[] files = FSUtils.listStatus(fs, dir); - if (files == null || files.length == 0) { - LOG.debug(dir + " has no files"); - return false; - } - for (int i = 0; i < files.length; i++) { - if (files[i].isFile() && files[i].getLen() > 0) { - LOG.debug(dir + " has a non-empty file: " + files[i].getPath()); - return true; - } else if (files[i].isDirectory() && checkWals(fs, files[i].getPath())) { - LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath()); - return true; - } - } - LOG.debug("Found 0 non-empty wal files for :" + dir); - return false; - } - - /** - * When a region is closed, it should be removed from the regionsToReopen - * @param hri HRegionInfo of the region which was closed - */ - public void removeClosedRegion(HRegionInfo hri) { - if (regionsToReopen.remove(hri.getEncodedName()) != null) { - LOG.debug("Removed region from reopening regions because it was closed"); - } - } - - void processFavoredNodesForDaughters(HRegionInfo parent, - HRegionInfo regionA, HRegionInfo regionB) throws IOException { - if (shouldAssignFavoredNodes(parent)) { - List onlineServers = this.serverManager.getOnlineServersList(); - ((FavoredNodesPromoter) this.balancer). - generateFavoredNodesForDaughter(onlineServers, parent, regionA, regionB); - } - } - - void processFavoredNodesForMerge(HRegionInfo merged, HRegionInfo regionA, HRegionInfo regionB) - throws IOException { - if (shouldAssignFavoredNodes(merged)) { - ((FavoredNodesPromoter)this.balancer). - generateFavoredNodesForMergedRegion(merged, regionA, regionB); - } - } - - /* - * Favored nodes should be applied only when FavoredNodes balancer is configured and the region - * belongs to a non-system table. - */ - private boolean shouldAssignFavoredNodes(HRegionInfo region) { - return this.shouldAssignRegionsWithFavoredNodes - && FavoredNodesManager.isFavoredNodeApplicable(region); - } - - /** - * Marks the region as online. Removes it from regions in transition and - * updates the in-memory assignment information. - *

- * Used when a region has been successfully opened on a region server. - * @param regionInfo - * @param sn - */ - void regionOnline(HRegionInfo regionInfo, ServerName sn) { - regionOnline(regionInfo, sn, HConstants.NO_SEQNUM); - } - - void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) { - numRegionsOpened.incrementAndGet(); - regionStates.regionOnline(regionInfo, sn, openSeqNum); - - // Remove plan if one. - clearRegionPlan(regionInfo); - balancer.regionOnline(regionInfo, sn); - - // Tell our listeners that a region was opened - sendRegionOpenedNotification(regionInfo, sn); - } - - /** - * Marks the region as offline. Removes it from regions in transition and - * removes in-memory assignment information. - *

- * Used when a region has been closed and should remain closed. - * @param regionInfo - */ - public void regionOffline(final HRegionInfo regionInfo) { - regionOffline(regionInfo, null); - } - - public void offlineDisabledRegion(HRegionInfo regionInfo) { - replicasToClose.remove(regionInfo); - regionOffline(regionInfo); - } - - // Assignment methods - - /** - * Assigns the specified region. - *

- * If a RegionPlan is available with a valid destination then it will be used - * to determine what server region is assigned to. If no RegionPlan is - * available, region will be assigned to a random available server. - *

- * Updates the RegionState and sends the OPEN RPC. - *

- * This will only succeed if the region is in transition and in a CLOSED or - * OFFLINE state or not in transition, and of course, the - * chosen server is up and running (It may have just crashed!). - * - * @param region server to be assigned - */ - public void assign(HRegionInfo region) { - assign(region, false); - } - - /** - * Use care with forceNewPlan. It could cause double assignment. - */ - public void assign(HRegionInfo region, boolean forceNewPlan) { - if (isDisabledorDisablingRegionInRIT(region)) { - return; - } - String encodedName = region.getEncodedName(); - Lock lock = locker.acquireLock(encodedName); - try { - RegionState state = forceRegionStateToOffline(region, forceNewPlan); - if (state != null) { - if (regionStates.wasRegionOnDeadServer(encodedName)) { - LOG.info("Skip assigning " + region.getRegionNameAsString() - + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) - + " is dead but not processed yet"); - return; - } - assign(state, forceNewPlan); - } - } finally { - lock.unlock(); - } - } - - /** - * Bulk assign regions to destination. - * @param destination - * @param regions Regions to assign. - * @return true if successful - */ - boolean assign(final ServerName destination, final List regions) - throws InterruptedException { - long startTime = EnvironmentEdgeManager.currentTime(); - try { - int regionCount = regions.size(); - if (regionCount == 0) { - return true; - } - LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString()); - Set encodedNames = new HashSet<>(regionCount); - for (HRegionInfo region : regions) { - encodedNames.add(region.getEncodedName()); - } - - List failedToOpenRegions = new ArrayList<>(); - Map locks = locker.acquireLocks(encodedNames); - try { - Map plans = new HashMap<>(regionCount); - List states = new ArrayList<>(regionCount); - for (HRegionInfo region : regions) { - String encodedName = region.getEncodedName(); - if (!isDisabledorDisablingRegionInRIT(region)) { - RegionState state = forceRegionStateToOffline(region, false); - boolean onDeadServer = false; - if (state != null) { - if (regionStates.wasRegionOnDeadServer(encodedName)) { - LOG.info("Skip assigning " + region.getRegionNameAsString() - + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) - + " is dead but not processed yet"); - onDeadServer = true; - } else { - RegionPlan plan = new RegionPlan(region, state.getServerName(), destination); - plans.put(encodedName, plan); - states.add(state); - continue; - } - } - // Reassign if the region wasn't on a dead server - if (!onDeadServer) { - LOG.info("failed to force region state to offline, " - + "will reassign later: " + region); - failedToOpenRegions.add(region); // assign individually later - } - } - // Release the lock, this region is excluded from bulk assign because - // we can't update its state, or set its znode to offline. - Lock lock = locks.remove(encodedName); - lock.unlock(); - } - - if (server.isStopped()) { - return false; - } - - // Add region plans, so we can updateTimers when one region is opened so - // that unnecessary timeout on RIT is reduced. - this.addPlans(plans); - - List>> regionOpenInfos = new ArrayList<>(states.size()); - for (RegionState state: states) { - HRegionInfo region = state.getRegion(); - regionStates.updateRegionState( - region, State.PENDING_OPEN, destination); - List favoredNodes = ServerName.EMPTY_SERVER_LIST; - if (shouldAssignFavoredNodes(region)) { - favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region); - } - regionOpenInfos.add(new Pair<>(region, favoredNodes)); - } - - // Move on to open regions. - try { - // Send OPEN RPC. If it fails on a IOE or RemoteException, - // regions will be assigned individually. - Configuration conf = server.getConfiguration(); - long maxWaitTime = System.currentTimeMillis() + - conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000); - for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) { - try { - List regionOpeningStateList = serverManager - .sendRegionOpen(destination, regionOpenInfos); - for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) { - RegionOpeningState openingState = regionOpeningStateList.get(k); - if (openingState != RegionOpeningState.OPENED) { - HRegionInfo region = regionOpenInfos.get(k).getFirst(); - LOG.info("Got opening state " + openingState - + ", will reassign later: " + region); - // Failed opening this region, reassign it later - forceRegionStateToOffline(region, true); - failedToOpenRegions.add(region); - } - } - break; - } catch (IOException e) { - if (e instanceof RemoteException) { - e = ((RemoteException)e).unwrapRemoteException(); - } - if (e instanceof RegionServerStoppedException) { - LOG.warn("The region server was shut down, ", e); - // No need to retry, the region server is a goner. - return false; - } else if (e instanceof ServerNotRunningYetException) { - long now = System.currentTimeMillis(); - if (now < maxWaitTime) { - if (LOG.isDebugEnabled()) { - LOG.debug("Server is not yet up; waiting up to " + - (maxWaitTime - now) + "ms", e); - } - Thread.sleep(100); - i--; // reset the try count - continue; - } - } else if (e instanceof java.net.SocketTimeoutException - && this.serverManager.isServerOnline(destination)) { - // In case socket is timed out and the region server is still online, - // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to - // open the region on the same server. - if (LOG.isDebugEnabled()) { - LOG.debug("Bulk assigner openRegion() to " + destination - + " has timed out, but the regions might" - + " already be opened on it.", e); - } - // wait and reset the re-try count, server might be just busy. - Thread.sleep(100); - i--; - continue; - } else if (e instanceof FailedServerException && i < maximumAttempts) { - // In case the server is in the failed server list, no point to - // retry too soon. Retry after the failed_server_expiry time - long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - if (LOG.isDebugEnabled()) { - LOG.debug(destination + " is on failed server list; waiting " - + sleepTime + "ms", e); - } - Thread.sleep(sleepTime); - continue; - } - throw e; - } - } - } catch (IOException e) { - // Can be a socket timeout, EOF, NoRouteToHost, etc - LOG.info("Unable to communicate with " + destination - + " in order to assign regions, ", e); - for (RegionState state: states) { - HRegionInfo region = state.getRegion(); - forceRegionStateToOffline(region, true); - } - return false; - } - } finally { - for (Lock lock : locks.values()) { - lock.unlock(); - } - } - - if (!failedToOpenRegions.isEmpty()) { - for (HRegionInfo region : failedToOpenRegions) { - if (!regionStates.isRegionOnline(region)) { - invokeAssign(region); - } - } - } - - // wait for assignment completion - ArrayList userRegionSet = new ArrayList<>(regions.size()); - for (HRegionInfo region: regions) { - if (!region.getTable().isSystemTable()) { - userRegionSet.add(region); - } - } - if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), - System.currentTimeMillis())) { - LOG.debug("some user regions are still in transition: " + userRegionSet); - } - LOG.debug("Bulk assigning done for " + destination); - return true; - } finally { - metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime); - } - } - - /** - * Send CLOSE RPC if the server is online, otherwise, offline the region. - * - * The RPC will be sent only to the region sever found in the region state - * if it is passed in, otherwise, to the src server specified. If region - * state is not specified, we don't update region state at all, instead - * we just send the RPC call. This is useful for some cleanup without - * messing around the region states (see handleRegion, on region opened - * on an unexpected server scenario, for an example) - */ - private void unassign(final HRegionInfo region, - final ServerName server, final ServerName dest) { - for (int i = 1; i <= this.maximumAttempts; i++) { - if (this.server.isStopped() || this.server.isAborted()) { - LOG.debug("Server stopped/aborted; skipping unassign of " + region); - return; - } - if (!serverManager.isServerOnline(server)) { - LOG.debug("Offline " + region.getRegionNameAsString() - + ", no need to unassign since it's on a dead server: " + server); - regionStates.updateRegionState(region, State.OFFLINE); - return; - } - try { - // Send CLOSE RPC - if (serverManager.sendRegionClose(server, region, dest)) { - LOG.debug("Sent CLOSE to " + server + " for region " + - region.getRegionNameAsString()); - return; - } - // This never happens. Currently regionserver close always return true. - // Todo; this can now happen (0.96) if there is an exception in a coprocessor - LOG.warn("Server " + server + " region CLOSE RPC returned false for " + - region.getRegionNameAsString()); - } catch (Throwable t) { - long sleepTime = 0; - Configuration conf = this.server.getConfiguration(); - if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); - } - if (t instanceof RegionServerAbortedException - || t instanceof RegionServerStoppedException - || t instanceof ServerNotRunningYetException) { - // RS is aborting, we cannot offline the region since the region may need to do WAL - // recovery. Until we see the RS expiration, we should retry. - sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - - } else if (t instanceof NotServingRegionException) { - LOG.debug("Offline " + region.getRegionNameAsString() - + ", it's not any more on " + server, t); - regionStates.updateRegionState(region, State.OFFLINE); - return; - } else if (t instanceof FailedServerException && i < maximumAttempts) { - // In case the server is in the failed server list, no point to - // retry too soon. Retry after the failed_server_expiry time - sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - if (LOG.isDebugEnabled()) { - LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t); - } - } - try { - if (sleepTime > 0) { - Thread.sleep(sleepTime); - } - } catch (InterruptedException ie) { - LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie); - Thread.currentThread().interrupt(); - regionStates.updateRegionState(region, State.FAILED_CLOSE); - return; - } - LOG.info("Server " + server + " returned " + t + " for " - + region.getRegionNameAsString() + ", try=" + i - + " of " + this.maximumAttempts, t); - } - } - // Run out of attempts - regionStates.updateRegionState(region, State.FAILED_CLOSE); - } - - /** - * Set region to OFFLINE unless it is opening and forceNewPlan is false. - */ - private RegionState forceRegionStateToOffline( - final HRegionInfo region, final boolean forceNewPlan) { - RegionState state = regionStates.getRegionState(region); - if (state == null) { - LOG.warn("Assigning but not in region states: " + region); - state = regionStates.createRegionState(region); - } - - if (forceNewPlan && LOG.isDebugEnabled()) { - LOG.debug("Force region state offline " + state); - } - - switch (state.getState()) { - case OPEN: - case OPENING: - case PENDING_OPEN: - case CLOSING: - case PENDING_CLOSE: - if (!forceNewPlan) { - LOG.debug("Skip assigning " + - region + ", it is already " + state); - return null; - } - case FAILED_CLOSE: - case FAILED_OPEN: - regionStates.updateRegionState(region, State.PENDING_CLOSE); - unassign(region, state.getServerName(), null); - state = regionStates.getRegionState(region); - if (!state.isOffline() && !state.isClosed()) { - // If the region isn't offline, we can't re-assign - // it now. It will be assigned automatically after - // the regionserver reports it's closed. - return null; - } - case OFFLINE: - case CLOSED: - break; - default: - LOG.error("Trying to assign region " + region - + ", which is " + state); - return null; - } - return state; - } - - /** - * Caller must hold lock on the passed state object. - * @param state - * @param forceNewPlan - */ - private void assign(RegionState state, boolean forceNewPlan) { - long startTime = EnvironmentEdgeManager.currentTime(); - try { - Configuration conf = server.getConfiguration(); - RegionPlan plan = null; - long maxWaitTime = -1; - HRegionInfo region = state.getRegion(); - Throwable previousException = null; - for (int i = 1; i <= maximumAttempts; i++) { - if (server.isStopped() || server.isAborted()) { - LOG.info("Skip assigning " + region.getRegionNameAsString() - + ", the server is stopped/aborted"); - return; - } - - if (plan == null) { // Get a server for the region at first - try { - plan = getRegionPlan(region, forceNewPlan); - } catch (HBaseIOException e) { - LOG.warn("Failed to get region plan", e); - } - } - - if (plan == null) { - LOG.warn("Unable to determine a plan to assign " + region); - - // For meta region, we have to keep retrying until succeeding - if (region.isMetaRegion()) { - if (i == maximumAttempts) { - i = 0; // re-set attempt count to 0 for at least 1 retry - - LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region + - " after maximumAttempts (" + this.maximumAttempts + - "). Reset attempts count and continue retrying."); - } - waitForRetryingMetaAssignment(); - continue; - } - - regionStates.updateRegionState(region, State.FAILED_OPEN); - return; - } - LOG.info("Assigning " + region.getRegionNameAsString() + - " to " + plan.getDestination()); - // Transition RegionState to PENDING_OPEN - regionStates.updateRegionState(region, - State.PENDING_OPEN, plan.getDestination()); - - boolean needNewPlan = false; - final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() + - " to " + plan.getDestination(); - try { - List favoredNodes = ServerName.EMPTY_SERVER_LIST; - if (shouldAssignFavoredNodes(region)) { - favoredNodes = server.getFavoredNodesManager().getFavoredNodesWithDNPort(region); - } - serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes); - return; // we're done - } catch (Throwable t) { - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - previousException = t; - - // Should we wait a little before retrying? If the server is starting it's yes. - boolean hold = (t instanceof ServerNotRunningYetException); - - // In case socket is timed out and the region server is still online, - // the openRegion RPC could have been accepted by the server and - // just the response didn't go through. So we will retry to - // open the region on the same server. - boolean retry = !hold && (t instanceof java.net.SocketTimeoutException - && this.serverManager.isServerOnline(plan.getDestination())); - - if (hold) { - LOG.warn(assignMsg + ", waiting a little before trying on the same region server " + - "try=" + i + " of " + this.maximumAttempts, t); - - if (maxWaitTime < 0) { - maxWaitTime = EnvironmentEdgeManager.currentTime() - + this.server.getConfiguration().getLong( - "hbase.regionserver.rpc.startup.waittime", 60000); - } - try { - long now = EnvironmentEdgeManager.currentTime(); - if (now < maxWaitTime) { - if (LOG.isDebugEnabled()) { - LOG.debug("Server is not yet up; waiting up to " - + (maxWaitTime - now) + "ms", t); - } - Thread.sleep(100); - i--; // reset the try count - } else { - LOG.debug("Server is not up for a while; try a new one", t); - needNewPlan = true; - } - } catch (InterruptedException ie) { - LOG.warn("Failed to assign " - + region.getRegionNameAsString() + " since interrupted", ie); - regionStates.updateRegionState(region, State.FAILED_OPEN); - Thread.currentThread().interrupt(); - return; - } - } else if (retry) { - i--; // we want to retry as many times as needed as long as the RS is not dead. - if (LOG.isDebugEnabled()) { - LOG.debug(assignMsg + ", trying to assign to the same region server due ", t); - } - } else { - needNewPlan = true; - LOG.warn(assignMsg + ", trying to assign elsewhere instead;" + - " try=" + i + " of " + this.maximumAttempts, t); - } - } - - if (i == this.maximumAttempts) { - // For meta region, we have to keep retrying until succeeding - if (region.isMetaRegion()) { - i = 0; // re-set attempt count to 0 for at least 1 retry - LOG.warn(assignMsg + - ", trying to assign a hbase:meta region reached to maximumAttempts (" + - this.maximumAttempts + "). Reset attempt counts and continue retrying."); - waitForRetryingMetaAssignment(); - } - else { - // Don't reset the region state or get a new plan any more. - // This is the last try. - continue; - } - } - - // If region opened on destination of present plan, reassigning to new - // RS may cause double assignments. In case of RegionAlreadyInTransitionException - // reassigning to same RS. - if (needNewPlan) { - // Force a new plan and reassign. Will return null if no servers. - // The new plan could be the same as the existing plan since we don't - // exclude the server of the original plan, which should not be - // excluded since it could be the only server up now. - RegionPlan newPlan = null; - try { - newPlan = getRegionPlan(region, true); - } catch (HBaseIOException e) { - LOG.warn("Failed to get region plan", e); - } - if (newPlan == null) { - regionStates.updateRegionState(region, State.FAILED_OPEN); - LOG.warn("Unable to find a viable location to assign region " + - region.getRegionNameAsString()); - return; - } - - if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) { - // Clean out plan we failed execute and one that doesn't look like it'll - // succeed anyways; we need a new plan! - // Transition back to OFFLINE - regionStates.updateRegionState(region, State.OFFLINE); - plan = newPlan; - } else if(plan.getDestination().equals(newPlan.getDestination()) && - previousException instanceof FailedServerException) { - try { - LOG.info("Trying to re-assign " + region.getRegionNameAsString() + - " to the same failed server."); - Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT)); - } catch (InterruptedException ie) { - LOG.warn("Failed to assign " - + region.getRegionNameAsString() + " since interrupted", ie); - regionStates.updateRegionState(region, State.FAILED_OPEN); - Thread.currentThread().interrupt(); - return; - } - } - } - } - // Run out of attempts - regionStates.updateRegionState(region, State.FAILED_OPEN); - } finally { - metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime); - } - } - - private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { - if (this.tableStateManager.isTableState(region.getTable(), - TableState.State.DISABLED, - TableState.State.DISABLING) || replicasToClose.contains(region)) { - LOG.info("Table " + region.getTable() + " is disabled or disabling;" - + " skipping assign of " + region.getRegionNameAsString()); - offlineDisabledRegion(region); - return true; - } - return false; - } - - /** - * @param region the region to assign - * @param forceNewPlan If true, then if an existing plan exists, a new plan - * will be generated. - * @return Plan for passed region (If none currently, it creates one or - * if no servers to assign, it returns null). - */ - private RegionPlan getRegionPlan(final HRegionInfo region, - final boolean forceNewPlan) throws HBaseIOException { - // Pickup existing plan or make a new one - final String encodedName = region.getEncodedName(); - final List destServers = - serverManager.createDestinationServersList(); - - if (destServers.isEmpty()){ - LOG.warn("Can't move " + encodedName + - ", there is no destination server available."); - return null; - } - - RegionPlan randomPlan = null; - boolean newPlan = false; - RegionPlan existingPlan; - - synchronized (this.regionPlans) { - existingPlan = this.regionPlans.get(encodedName); - - if (existingPlan != null && existingPlan.getDestination() != null) { - LOG.debug("Found an existing plan for " + region.getRegionNameAsString() - + " destination server is " + existingPlan.getDestination() + - " accepted as a dest server = " + destServers.contains(existingPlan.getDestination())); - } - - if (forceNewPlan - || existingPlan == null - || existingPlan.getDestination() == null - || !destServers.contains(existingPlan.getDestination())) { - newPlan = true; - try { - randomPlan = new RegionPlan(region, null, - balancer.randomAssignment(region, destServers)); - } catch (IOException ex) { - LOG.warn("Failed to create new plan.",ex); - return null; - } - this.regionPlans.put(encodedName, randomPlan); - } - } - - if (newPlan) { - if (randomPlan.getDestination() == null) { - LOG.warn("Can't find a destination for " + encodedName); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("No previous transition plan found (or ignoring " + - "an existing plan) for " + region.getRegionNameAsString() + - "; generated random plan=" + randomPlan + "; " + destServers.size() + - " (online=" + serverManager.getOnlineServers().size() + - ") available servers, forceNewPlan=" + forceNewPlan); - } - return randomPlan; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Using pre-existing plan for " + - region.getRegionNameAsString() + "; plan=" + existingPlan); - } - return existingPlan; - } - - /** - * Wait for some time before retrying meta table region assignment - */ - private void waitForRetryingMetaAssignment() { - try { - Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment); - } catch (InterruptedException e) { - LOG.error("Got exception while waiting for hbase:meta assignment"); - Thread.currentThread().interrupt(); - } - } - - /** - * Unassigns the specified region. - *

- * Updates the RegionState and sends the CLOSE RPC unless region is being - * split by regionserver; then the unassign fails (silently) because we - * presume the region being unassigned no longer exists (its been split out - * of existence). TODO: What to do if split fails and is rolled back and - * parent is revivified? - *

- * If a RegionPlan is already set, it will remain. - * - * @param region server to be unassigned - */ - public void unassign(HRegionInfo region) { - unassign(region, null); - } - - - /** - * Unassigns the specified region. - *

- * Updates the RegionState and sends the CLOSE RPC unless region is being - * split by regionserver; then the unassign fails (silently) because we - * presume the region being unassigned no longer exists (its been split out - * of existence). TODO: What to do if split fails and is rolled back and - * parent is revivified? - *

- * If a RegionPlan is already set, it will remain. - * - * @param region server to be unassigned - * @param dest the destination server of the region - */ - public void unassign(HRegionInfo region, ServerName dest) { - // TODO: Method needs refactoring. Ugly buried returns throughout. Beware! - LOG.debug("Starting unassign of " + region.getRegionNameAsString() - + " (offlining), current state: " + regionStates.getRegionState(region)); - - String encodedName = region.getEncodedName(); - // Grab the state of this region and synchronize on it - // We need a lock here as we're going to do a put later and we don't want multiple states - // creation - ReentrantLock lock = locker.acquireLock(encodedName); - RegionState state = regionStates.getRegionTransitionState(encodedName); - try { - if (state == null || state.isFailedClose()) { - if (state == null) { - // Region is not in transition. - // We can unassign it only if it's not SPLIT/MERGED. - state = regionStates.getRegionState(encodedName); - if (state != null && state.isUnassignable()) { - LOG.info("Attempting to unassign " + state + ", ignored"); - // Offline region will be reassigned below - return; - } - if (state == null || state.getServerName() == null) { - // We don't know where the region is, offline it. - // No need to send CLOSE RPC - LOG.warn("Attempting to unassign a region not in RegionStates " - + region.getRegionNameAsString() + ", offlined"); - regionOffline(region); - return; - } - } - state = regionStates.updateRegionState( - region, State.PENDING_CLOSE); - } else if (state.isFailedOpen()) { - // The region is not open yet - regionOffline(region); - return; - } else { - LOG.debug("Attempting to unassign " + - region.getRegionNameAsString() + " but it is " + - "already in transition (" + state.getState()); - return; - } - - unassign(region, state.getServerName(), dest); - } finally { - lock.unlock(); - - // Region is expected to be reassigned afterwards - if (!replicasToClose.contains(region) - && regionStates.isRegionInState(region, State.OFFLINE)) { - assign(region); - } - } - } - - /** - * Used by unit tests. Return the number of regions opened so far in the life - * of the master. Increases by one every time the master opens a region - * @return the counter value of the number of regions opened so far - */ - public int getNumRegionsOpened() { - return numRegionsOpened.get(); - } - - /** - * Waits until the specified region has completed assignment. - *

- * If the region is already assigned, returns immediately. Otherwise, method - * blocks until the region is assigned. - * @param regionInfo region to wait on assignment for - * @return true if the region is assigned false otherwise. - * @throws InterruptedException - */ - public boolean waitForAssignment(HRegionInfo regionInfo) - throws InterruptedException { - ArrayList regionSet = new ArrayList<>(1); - regionSet.add(regionInfo); - return waitForAssignment(regionSet, true, Long.MAX_VALUE); - } - - /** - * Waits until the specified region has completed assignment, or the deadline is reached. - */ - protected boolean waitForAssignment(final Collection regionSet, - final boolean waitTillAllAssigned, final int reassigningRegions, - final long minEndTime) throws InterruptedException { - long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); - if (deadline < 0) { // Overflow - deadline = Long.MAX_VALUE; // wait forever - } - return waitForAssignment(regionSet, waitTillAllAssigned, deadline); - } - - /** - * Waits until the specified region has completed assignment, or the deadline is reached. - * @param regionSet set of region to wait on. the set is modified and the assigned regions removed - * @param waitTillAllAssigned true if we should wait all the regions to be assigned - * @param deadline the timestamp after which the wait is aborted - * @return true if all the regions are assigned false otherwise. - * @throws InterruptedException - */ - protected boolean waitForAssignment(final Collection regionSet, - final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { - // We're not synchronizing on regionsInTransition now because we don't use any iterator. - while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { - int failedOpenCount = 0; - Iterator regionInfoIterator = regionSet.iterator(); - while (regionInfoIterator.hasNext()) { - HRegionInfo hri = regionInfoIterator.next(); - if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, - State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { - regionInfoIterator.remove(); - } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { - failedOpenCount++; - } - } - if (!waitTillAllAssigned) { - // No need to wait, let assignment going on asynchronously - break; - } - if (!regionSet.isEmpty()) { - if (failedOpenCount == regionSet.size()) { - // all the regions we are waiting had an error on open. - break; - } - regionStates.waitForUpdate(100); - } - } - return regionSet.isEmpty(); - } - - /** - * Assigns the hbase:meta region or a replica. - *

- * Assumes that hbase:meta is currently closed and is not being actively served by - * any RegionServer. - * @param hri TODO - */ - public void assignMeta(HRegionInfo hri) throws KeeperException { - regionStates.updateRegionState(hri, State.OFFLINE); - assign(hri); - } - - /** - * Assigns specified regions retaining assignments, if any. - *

- * This is a synchronous call and will return once every region has been - * assigned. If anything fails, an exception is thrown - * @throws InterruptedException - * @throws IOException - */ - public void assign(Map regions) - throws IOException, InterruptedException { - if (regions == null || regions.isEmpty()) { - return; - } - List servers = serverManager.createDestinationServersList(); - if (servers == null || servers.isEmpty()) { - throw new IOException("Found no destination server to assign region(s)"); - } - - // Reuse existing assignment info - Map> bulkPlan = - balancer.retainAssignment(regions, servers); - if (bulkPlan == null) { - throw new IOException("Unable to determine a plan to assign region(s)"); - } - - processBogusAssignments(bulkPlan); - - assign(regions.size(), servers.size(), - "retainAssignment=true", bulkPlan); - } - - /** - * Assigns specified regions round robin, if any. - *

- * This is a synchronous call and will return once every region has been - * assigned. If anything fails, an exception is thrown - * @throws InterruptedException - * @throws IOException - */ - public void assign(List regions) - throws IOException, InterruptedException { - if (regions == null || regions.isEmpty()) { - return; - } - - List servers = serverManager.createDestinationServersList(); - if (servers == null || servers.isEmpty()) { - throw new IOException("Found no destination server to assign region(s)"); - } - - // Generate a round-robin bulk assignment plan - Map> bulkPlan = balancer.roundRobinAssignment(regions, servers); - if (bulkPlan == null) { - throw new IOException("Unable to determine a plan to assign region(s)"); - } - - processBogusAssignments(bulkPlan); - - assign(regions.size(), servers.size(), "round-robin=true", bulkPlan); - } - - private void assign(int regions, int totalServers, - String message, Map> bulkPlan) - throws InterruptedException, IOException { - - int servers = bulkPlan.size(); - if (servers == 1 || (regions < bulkAssignThresholdRegions - && servers < bulkAssignThresholdServers)) { - - // Not use bulk assignment. This could be more efficient in small - // cluster, especially mini cluster for testing, so that tests won't time out - if (LOG.isTraceEnabled()) { - LOG.trace("Not using bulk assignment since we are assigning only " + regions + - " region(s) to " + servers + " server(s)"); - } - - // invoke assignment (async) - ArrayList userRegionSet = new ArrayList<>(regions); - for (Map.Entry> plan: bulkPlan.entrySet()) { - if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) { - for (HRegionInfo region: plan.getValue()) { - if (!regionStates.isRegionOnline(region)) { - invokeAssign(region); - if (!region.getTable().isSystemTable()) { - userRegionSet.add(region); - } - } - } - } - } - - // wait for assignment completion - if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), - System.currentTimeMillis())) { - LOG.debug("some user regions are still in transition: " + userRegionSet); - } - } else { - LOG.info("Bulk assigning " + regions + " region(s) across " - + totalServers + " server(s), " + message); - - // Use fixed count thread pool assigning. - BulkAssigner ba = new GeneralBulkAssigner( - this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned); - ba.bulkAssign(); - LOG.info("Bulk assigning done"); - } - } - - /** - * Assigns all user regions, if any exist. Used during cluster startup. - *

- * This is a synchronous call and will return once every region has been - * assigned. If anything fails, an exception is thrown and the cluster - * should be shutdown. - * @throws InterruptedException - * @throws IOException - */ - private void assignAllUserRegions(Map allRegions) - throws IOException, InterruptedException { - if (allRegions == null || allRegions.isEmpty()) return; - - // Determine what type of assignment to do on startup - boolean retainAssignment = server.getConfiguration(). - getBoolean("hbase.master.startup.retainassign", true); - - Set regionsFromMetaScan = allRegions.keySet(); - if (retainAssignment) { - assign(allRegions); - } else { - List regions = new ArrayList<>(regionsFromMetaScan); - assign(regions); - } - - for (HRegionInfo hri : regionsFromMetaScan) { - TableName tableName = hri.getTable(); - if (!tableStateManager.isTableState(tableName, - TableState.State.ENABLED)) { - setEnabledTable(tableName); - } - } - // assign all the replicas that were not recorded in the meta - assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server)); - } - - /** - * Get number of replicas of a table - */ - private static int getNumReplicas(MasterServices master, TableName table) { - int numReplica = 1; - try { - HTableDescriptor htd = master.getTableDescriptors().get(table); - if (htd == null) { - LOG.warn("master can not get TableDescriptor from table '" + table); - } else { - numReplica = htd.getRegionReplication(); - } - } catch (IOException e){ - LOG.warn("Couldn't get the replication attribute of the table " + table + " due to " - + e.getMessage()); - } - return numReplica; - } - - /** - * Get a list of replica regions that are: - * not recorded in meta yet. We might not have recorded the locations - * for the replicas since the replicas may not have been online yet, master restarted - * in the middle of assigning, ZK erased, etc. - * @param regionsRecordedInMeta the list of regions we know are recorded in meta - * either as a default, or, as the location of a replica - * @param master - * @return list of replica regions - * @throws IOException - */ - public static List replicaRegionsNotRecordedInMeta( - Set regionsRecordedInMeta, MasterServices master)throws IOException { - List regionsNotRecordedInMeta = new ArrayList<>(); - for (HRegionInfo hri : regionsRecordedInMeta) { - TableName table = hri.getTable(); - if(master.getTableDescriptors().get(table) == null) - continue; - int desiredRegionReplication = getNumReplicas(master, table); - for (int i = 0; i < desiredRegionReplication; i++) { - HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i); - if (regionsRecordedInMeta.contains(replica)) continue; - regionsNotRecordedInMeta.add(replica); - } - } - return regionsNotRecordedInMeta; - } - - /** - * Rebuild the list of user regions and assignment information. - * Updates regionstates with findings as we go through list of regions. - * @return set of servers not online that hosted some regions according to a scan of hbase:meta - * @throws IOException - */ - Set rebuildUserRegions() throws - IOException, KeeperException { - Set disabledOrEnablingTables = tableStateManager.getTablesInStates( - TableState.State.DISABLED, TableState.State.ENABLING); - - Set disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( - TableState.State.DISABLED, - TableState.State.DISABLING, - TableState.State.ENABLING); - - // Region assignment from META - List results = MetaTableAccessor.fullScanRegions(server.getConnection()); - // Get any new but slow to checkin region server that joined the cluster - Set onlineServers = serverManager.getOnlineServers().keySet(); - // Set of offline servers to be returned - Set offlineServers = new HashSet<>(); - // Iterate regions in META - for (Result result : results) { - if (result == null && LOG.isDebugEnabled()){ - LOG.debug("null result from meta - ignoring but this is strange."); - continue; - } - // keep a track of replicas to close. These were the replicas of the originally - // unmerged regions. The master might have closed them before but it mightn't - // maybe because it crashed. - PairOfSameType p = MetaTableAccessor.getMergeRegions(result); - if (p.getFirst() != null && p.getSecond() != null) { - int numReplicas = getNumReplicas(server, p.getFirst().getTable()); - for (HRegionInfo merge : p) { - for (int i = 1; i < numReplicas; i++) { - replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i)); - } - } - } - RegionLocations rl = MetaTableAccessor.getRegionLocations(result); - if (rl == null) { - continue; - } - HRegionLocation[] locations = rl.getRegionLocations(); - if (locations == null) { - continue; - } - for (HRegionLocation hrl : locations) { - if (hrl == null) continue; - HRegionInfo regionInfo = hrl.getRegionInfo(); - if (regionInfo == null) continue; - int replicaId = regionInfo.getReplicaId(); - State state = RegionStateStore.getRegionState(result, replicaId); - // keep a track of replicas to close. These were the replicas of the split parents - // from the previous life of the master. The master should have closed them before - // but it couldn't maybe because it crashed - if (replicaId == 0 && state.equals(State.SPLIT)) { - for (HRegionLocation h : locations) { - replicasToClose.add(h.getRegionInfo()); - } - } - ServerName lastHost = hrl.getServerName(); - ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId); - regionStates.createRegionState(regionInfo, state, regionLocation, lastHost); - if (!regionStates.isRegionInState(regionInfo, State.OPEN)) { - // Region is not open (either offline or in transition), skip - continue; - } - TableName tableName = regionInfo.getTable(); - if (!onlineServers.contains(regionLocation)) { - // Region is located on a server that isn't online - offlineServers.add(regionLocation); - } else if (!disabledOrEnablingTables.contains(tableName)) { - // Region is being served and on an active server - // add only if region not in disabled or enabling table - regionStates.regionOnline(regionInfo, regionLocation); - balancer.regionOnline(regionInfo, regionLocation); - } - // need to enable the table if not disabled or disabling or enabling - // this will be used in rolling restarts - if (!disabledOrDisablingOrEnabling.contains(tableName) - && !getTableStateManager().isTableState(tableName, - TableState.State.ENABLED)) { - setEnabledTable(tableName); - } - } - } - return offlineServers; - } - - /** - * Processes list of regions in transition at startup - */ - void processRegionsInTransition(Collection regionsInTransition) { - // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions - // in case the RPC call is not sent out yet before the master was shut down - // since we update the state before we send the RPC call. We can't update - // the state after the RPC call. Otherwise, we don't know what's happened - // to the region if the master dies right after the RPC call is out. - for (RegionState regionState: regionsInTransition) { - LOG.info("Processing " + regionState); - ServerName serverName = regionState.getServerName(); - // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that - // case, try assigning it here. - if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) { - LOG.info("Server " + serverName + " isn't online. SSH will handle this"); - continue; // SSH will handle it - } - HRegionInfo regionInfo = regionState.getRegion(); - RegionState.State state = regionState.getState(); - switch (state) { - case CLOSED: - invokeAssign(regionState.getRegion()); - break; - case PENDING_OPEN: - retrySendRegionOpen(regionState); - break; - case PENDING_CLOSE: - retrySendRegionClose(regionState); - break; - case FAILED_CLOSE: - case FAILED_OPEN: - invokeUnAssign(regionInfo); - break; - default: - // No process for other states - break; - } - } - } - - /** - * At master failover, for pending_open region, make sure - * sendRegionOpen RPC call is sent to the target regionserver - */ - private void retrySendRegionOpen(final RegionState regionState) { - this.executorService.submit( - new EventHandler(server, EventType.M_MASTER_RECOVERY) { - @Override - public void process() throws IOException { - HRegionInfo hri = regionState.getRegion(); - ServerName serverName = regionState.getServerName(); - ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); - try { - for (int i = 1; i <= maximumAttempts; i++) { - if (!serverManager.isServerOnline(serverName) - || server.isStopped() || server.isAborted()) { - return; // No need any more - } - try { - if (!regionState.equals(regionStates.getRegionState(hri))) { - return; // Region is not in the expected state any more - } - List favoredNodes = ServerName.EMPTY_SERVER_LIST; - if (shouldAssignFavoredNodes(hri)) { - FavoredNodesManager fnm = ((MasterServices)server).getFavoredNodesManager(); - favoredNodes = fnm.getFavoredNodesWithDNPort(hri); - } - serverManager.sendRegionOpen(serverName, hri, favoredNodes); - return; // we're done - } catch (Throwable t) { - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof FailedServerException && i < maximumAttempts) { - // In case the server is in the failed server list, no point to - // retry too soon. Retry after the failed_server_expiry time - try { - Configuration conf = this.server.getConfiguration(); - long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - if (LOG.isDebugEnabled()) { - LOG.debug(serverName + " is on failed server list; waiting " - + sleepTime + "ms", t); - } - Thread.sleep(sleepTime); - continue; - } catch (InterruptedException ie) { - LOG.warn("Failed to assign " - + hri.getRegionNameAsString() + " since interrupted", ie); - regionStates.updateRegionState(hri, State.FAILED_OPEN); - Thread.currentThread().interrupt(); - return; - } - } - if (serverManager.isServerOnline(serverName) - && t instanceof java.net.SocketTimeoutException) { - i--; // reset the try count - } else { - LOG.info("Got exception in retrying sendRegionOpen for " - + regionState + "; try=" + i + " of " + maximumAttempts, t); - } - Threads.sleep(100); - } - } - // Run out of attempts - regionStates.updateRegionState(hri, State.FAILED_OPEN); - } finally { - lock.unlock(); - } - } - }); - } - - /** - * At master failover, for pending_close region, make sure - * sendRegionClose RPC call is sent to the target regionserver - */ - private void retrySendRegionClose(final RegionState regionState) { - this.executorService.submit( - new EventHandler(server, EventType.M_MASTER_RECOVERY) { - @Override - public void process() throws IOException { - HRegionInfo hri = regionState.getRegion(); - ServerName serverName = regionState.getServerName(); - ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); - try { - for (int i = 1; i <= maximumAttempts; i++) { - if (!serverManager.isServerOnline(serverName) - || server.isStopped() || server.isAborted()) { - return; // No need any more - } - try { - if (!regionState.equals(regionStates.getRegionState(hri))) { - return; // Region is not in the expected state any more - } - serverManager.sendRegionClose(serverName, hri, null); - return; // Done. - } catch (Throwable t) { - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof FailedServerException && i < maximumAttempts) { - // In case the server is in the failed server list, no point to - // retry too soon. Retry after the failed_server_expiry time - try { - Configuration conf = this.server.getConfiguration(); - long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, - RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); - if (LOG.isDebugEnabled()) { - LOG.debug(serverName + " is on failed server list; waiting " - + sleepTime + "ms", t); - } - Thread.sleep(sleepTime); - continue; - } catch (InterruptedException ie) { - LOG.warn("Failed to unassign " - + hri.getRegionNameAsString() + " since interrupted", ie); - regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE); - Thread.currentThread().interrupt(); - return; - } - } - if (serverManager.isServerOnline(serverName) - && t instanceof java.net.SocketTimeoutException) { - i--; // reset the try count - } else { - LOG.info("Got exception in retrying sendRegionClose for " - + regionState + "; try=" + i + " of " + maximumAttempts, t); - } - Threads.sleep(100); - } - } - // Run out of attempts - regionStates.updateRegionState(hri, State.FAILED_CLOSE); - } finally { - lock.unlock(); - } - } - }); - } - - /** - * Set Regions in transitions metrics. - * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized. - * This iterator is not fail fast, which may lead to stale read; but that's better than - * creating a copy of the map for metrics computation, as this method will be invoked - * on a frequent interval. - */ - public void updateRegionsInTransitionMetrics() { - long currentTime = System.currentTimeMillis(); - int totalRITs = 0; - int totalRITsOverThreshold = 0; - long oldestRITTime = 0; - int ritThreshold = this.server.getConfiguration(). - getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000); - for (RegionState state: regionStates.getRegionsInTransition()) { - totalRITs++; - long ritTime = currentTime - state.getStamp(); - if (ritTime > ritThreshold) { // more than the threshold - totalRITsOverThreshold++; - } - if (oldestRITTime < ritTime) { - oldestRITTime = ritTime; - } - } - if (this.metricsAssignmentManager != null) { - this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime); - this.metricsAssignmentManager.updateRITCount(totalRITs); - this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold); - } - } - - /** - * @param region Region whose plan we are to clear. - */ - private void clearRegionPlan(final HRegionInfo region) { - synchronized (this.regionPlans) { - this.regionPlans.remove(region.getEncodedName()); - } - } - - /** - * Wait on region to clear regions-in-transition. - * @param hri Region to wait on. - * @throws IOException - */ - public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri) - throws IOException, InterruptedException { - waitOnRegionToClearRegionsInTransition(hri, -1L); - } - - /** - * Wait on region to clear regions-in-transition or time out - * @param hri - * @param timeOut Milliseconds to wait for current region to be out of transition state. - * @return True when a region clears regions-in-transition before timeout otherwise false - * @throws InterruptedException - */ - public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut) - throws InterruptedException { - if (!regionStates.isRegionInTransition(hri)) { - return true; - } - long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime() - + timeOut; - // There is already a timeout monitor on regions in transition so I - // should not have to have one here too? - LOG.info("Waiting for " + hri.getEncodedName() + - " to leave regions-in-transition, timeOut=" + timeOut + " ms."); - while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) { - regionStates.waitForUpdate(100); - if (EnvironmentEdgeManager.currentTime() > end) { - LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned."); - return false; - } - } - if (this.server.isStopped()) { - LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set"); - return false; - } - return true; - } - - void invokeAssign(HRegionInfo regionInfo) { - threadPoolExecutorService.submit(new AssignCallable(this, regionInfo)); - } - - void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) { - scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable( - new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS); - } - - void invokeUnAssign(HRegionInfo regionInfo) { - threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo)); - } - - public boolean isCarryingMeta(ServerName serverName) { - return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO); - } - - public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) { - return isCarryingRegion(serverName, - RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId)); - } - - public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) { - return isCarryingRegion(serverName, metaHri); - } - - /** - * Check if the shutdown server carries the specific region. - * @return whether the serverName currently hosts the region - */ - private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) { - RegionState regionState = regionStates.getRegionTransitionState(hri); - ServerName transitionAddr = regionState != null? regionState.getServerName(): null; - if (transitionAddr != null) { - boolean matchTransitionAddr = transitionAddr.equals(serverName); - LOG.debug("Checking region=" + hri.getRegionNameAsString() - + ", transitioning on server=" + matchTransitionAddr - + " server being checked: " + serverName - + ", matches=" + matchTransitionAddr); - return matchTransitionAddr; - } - - ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri); - boolean matchAssignedAddr = serverName.equals(assignedAddr); - LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() - + " is on server=" + assignedAddr + ", server being checked: " - + serverName); - return matchAssignedAddr; - } - - /** - * Clean out crashed server removing any assignments. - * @param sn Server that went down. - * @return list of regions in transition on this server - */ - public List cleanOutCrashedServerReferences(final ServerName sn) { - // Clean out any existing assignment plans for this server - synchronized (this.regionPlans) { - for (Iterator > i = this.regionPlans.entrySet().iterator(); - i.hasNext();) { - Map.Entry e = i.next(); - ServerName otherSn = e.getValue().getDestination(); - // The name will be null if the region is planned for a random assign. - if (otherSn != null && otherSn.equals(sn)) { - // Use iterator's remove else we'll get CME - i.remove(); - } - } - } - List rits = regionStates.serverOffline(sn); - for (Iterator it = rits.iterator(); it.hasNext(); ) { - HRegionInfo hri = it.next(); - String encodedName = hri.getEncodedName(); - - // We need a lock on the region as we could update it - Lock lock = locker.acquireLock(encodedName); - try { - RegionState regionState = regionStates.getRegionTransitionState(encodedName); - if (regionState == null - || (regionState.getServerName() != null && !regionState.isOnServer(sn)) - || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN, - State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) { - LOG.info("Skip " + regionState + " since it is not opening/failed_close" - + " on the dead server any more: " + sn); - it.remove(); - } else { - if