Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CE3D118CB5 for ; Fri, 19 Feb 2016 19:26:52 +0000 (UTC) Received: (qmail 21920 invoked by uid 500); 19 Feb 2016 19:26:52 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 21803 invoked by uid 500); 19 Feb 2016 19:26:52 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 21612 invoked by uid 99); 19 Feb 2016 19:26:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Feb 2016 19:26:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 458ABE09C5; Fri, 19 Feb 2016 19:26:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 19 Feb 2016 19:26:59 -0000 Message-Id: In-Reply-To: <8a758ff426c54eddb51ed8c2e87e2926@git.apache.org> References: <8a758ff426c54eddb51ed8c2e87e2926@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/15] hbase-site git commit: Published site at 8eedc967515a4d9133068962fe029160d24e6f95. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f564c619/devapidocs/src-html/org/apache/hadoop/hbase/master/AssignmentManager.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/AssignmentManager.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/AssignmentManager.html index addaeb1..96c4e73 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/AssignmentManager.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/AssignmentManager.html @@ -26,2906 +26,2948 @@ 018 */ 019package org.apache.hadoop.hbase.master; 020 -021import java.io.IOException; -022import java.util.ArrayList; -023import java.util.Collection; -024import java.util.Collections; -025import java.util.HashMap; -026import java.util.HashSet; -027import java.util.Iterator; -028import java.util.List; -029import java.util.Map; -030import java.util.NavigableMap; -031import java.util.Random; -032import java.util.Set; -033import java.util.TreeMap; -034import java.util.concurrent.Callable; -035import java.util.concurrent.ConcurrentHashMap; -036import java.util.concurrent.CopyOnWriteArrayList; -037import java.util.concurrent.TimeUnit; -038import java.util.concurrent.atomic.AtomicBoolean; -039import java.util.concurrent.atomic.AtomicInteger; -040import java.util.concurrent.locks.Lock; -041import java.util.concurrent.locks.ReentrantLock; -042 -043import org.apache.commons.logging.Log; -044import org.apache.commons.logging.LogFactory; -045import org.apache.hadoop.hbase.classification.InterfaceAudience; -046import org.apache.hadoop.conf.Configuration; -047import org.apache.hadoop.fs.FileSystem; -048import org.apache.hadoop.fs.Path; -049import org.apache.hadoop.hbase.CoordinatedStateException; -050import org.apache.hadoop.hbase.HBaseIOException; -051import org.apache.hadoop.hbase.HConstants; -052import org.apache.hadoop.hbase.HRegionInfo; -053import org.apache.hadoop.hbase.HRegionLocation; -054import org.apache.hadoop.hbase.HTableDescriptor; -055import org.apache.hadoop.hbase.MetaTableAccessor; -056import org.apache.hadoop.hbase.NotServingRegionException; -057import org.apache.hadoop.hbase.RegionLocations; -058import org.apache.hadoop.hbase.RegionStateListener; -059import org.apache.hadoop.hbase.ServerName; -060import org.apache.hadoop.hbase.TableName; -061import org.apache.hadoop.hbase.TableNotFoundException; -062import org.apache.hadoop.hbase.client.RegionReplicaUtil; -063import org.apache.hadoop.hbase.client.Result; -064import org.apache.hadoop.hbase.client.TableState; -065import org.apache.hadoop.hbase.executor.EventHandler; -066import org.apache.hadoop.hbase.executor.EventType; -067import org.apache.hadoop.hbase.executor.ExecutorService; -068import org.apache.hadoop.hbase.ipc.FailedServerException; -069import org.apache.hadoop.hbase.ipc.RpcClient; -070import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -071import org.apache.hadoop.hbase.master.RegionState.State; -072import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; -073import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; -074import org.apache.hadoop.hbase.master.handler.DisableTableHandler; -075import org.apache.hadoop.hbase.master.handler.EnableTableHandler; -076import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; -077import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; -078import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -079import org.apache.hadoop.hbase.quotas.QuotaExceededException; -080import org.apache.hadoop.hbase.regionserver.RegionOpeningState; -081import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; -082import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -083import org.apache.hadoop.hbase.wal.DefaultWALProvider; -084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -085import org.apache.hadoop.hbase.util.FSUtils; -086import org.apache.hadoop.hbase.util.KeyLocker; -087import org.apache.hadoop.hbase.util.Pair; -088import org.apache.hadoop.hbase.util.PairOfSameType; -089import org.apache.hadoop.hbase.util.Threads; -090import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -091import org.apache.hadoop.ipc.RemoteException; -092import org.apache.hadoop.util.StringUtils; -093import org.apache.zookeeper.KeeperException; -094 -095import com.google.common.annotations.VisibleForTesting; -096 -097/** -098 * Manages and performs region assignment. -099 * Related communications with regionserver are all done over RPC. -100 */ -101@InterfaceAudience.Private -102public class AssignmentManager { -103 private static final Log LOG = LogFactory.getLog(AssignmentManager.class); -104 -105 protected final MasterServices server; -106 -107 private ServerManager serverManager; -108 -109 private boolean shouldAssignRegionsWithFavoredNodes; -110 -111 private LoadBalancer balancer; -112 -113 private final MetricsAssignmentManager metricsAssignmentManager; -114 -115 private final TableLockManager tableLockManager; -116 -117 private AtomicInteger numRegionsOpened = new AtomicInteger(0); -118 -119 final private KeyLocker<String> locker = new KeyLocker<String>(); -120 -121 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>()); -122 -123 /** -124 * Map of regions to reopen after the schema of a table is changed. Key - -125 * encoded region name, value - HRegionInfo -126 */ -127 private final Map <String, HRegionInfo> regionsToReopen; -128 -129 /* -130 * Maximum times we recurse an assignment/unassignment. -131 * See below in {@link #assign()} and {@link #unassign()}. -132 */ -133 private final int maximumAttempts; -134 -135 /** -136 * The sleep time for which the assignment will wait before retrying in case of -137 * hbase:meta assignment failure due to lack of availability of region plan or bad region plan -138 */ -139 private final long sleepTimeBeforeRetryingMetaAssignment; -140 -141 /** Plans for region movement. Key is the encoded version of a region name*/ -142 // TODO: When do plans get cleaned out? Ever? In server open and in server -143 // shutdown processing -- St.Ack -144 // All access to this Map must be synchronized. -145 final NavigableMap<String, RegionPlan> regionPlans = -146 new TreeMap<String, RegionPlan>(); -147 -148 private final TableStateManager tableStateManager; -149 -150 private final ExecutorService executorService; -151 -152 // Thread pool executor service. TODO, consolidate with executorService? -153 private java.util.concurrent.ExecutorService threadPoolExecutorService; -154 -155 private final RegionStates regionStates; -156 -157 // The threshold to use bulk assigning. Using bulk assignment -158 // only if assigning at least this many regions to at least this -159 // many servers. If assigning fewer regions to fewer servers, -160 // bulk assigning may be not as efficient. -161 private final int bulkAssignThresholdRegions; -162 private final int bulkAssignThresholdServers; -163 private final int bulkPerRegionOpenTimeGuesstimate; -164 -165 // Should bulk assignment wait till all regions are assigned, -166 // or it is timed out? This is useful to measure bulk assignment -167 // performance, but not needed in most use cases. -168 private final boolean bulkAssignWaitTillAllAssigned; -169 -170 /** -171 * Indicator that AssignmentManager has recovered the region states so -172 * that ServerShutdownHandler can be fully enabled and re-assign regions -173 * of dead servers. So that when re-assignment happens, AssignmentManager -174 * has proper region states. -175 * -176 * Protected to ease testing. -177 */ -178 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); -179 -180 /** -181 * A map to track the count a region fails to open in a row. -182 * So that we don't try to open a region forever if the failure is -183 * unrecoverable. We don't put this information in region states -184 * because we don't expect this to happen frequently; we don't -185 * want to copy this information over during each state transition either. -186 */ -187 private final ConcurrentHashMap<String, AtomicInteger> -188 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>(); -189 -190 // In case not using ZK for region assignment, region states -191 // are persisted in meta with a state store -192 private final RegionStateStore regionStateStore; -193 -194 /** -195 * For testing only! Set to true to skip handling of split. -196 */ -197 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") -198 public static boolean TEST_SKIP_SPLIT_HANDLING = false; -199 -200 /** Listeners that are called on assignment events. */ -201 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>(); -202 -203 private RegionStateListener regionStateListener; -204 -205 /** -206 * Constructs a new assignment manager. -207 * -208 * @param server instance of HMaster this AM running inside -209 * @param serverManager serverManager for associated HMaster -210 * @param balancer implementation of {@link LoadBalancer} -211 * @param service Executor service -212 * @param metricsMaster metrics manager -213 * @param tableLockManager TableLock manager -214 * @throws IOException -215 */ -216 public AssignmentManager(MasterServices server, ServerManager serverManager, -217 final LoadBalancer balancer, -218 final ExecutorService service, MetricsMaster metricsMaster, -219 final TableLockManager tableLockManager, -220 final TableStateManager tableStateManager) -221 throws IOException { -222 this.server = server; -223 this.serverManager = serverManager; -224 this.executorService = service; -225 this.regionStateStore = new RegionStateStore(server); -226 this.regionsToReopen = Collections.synchronizedMap -227 (new HashMap<String, HRegionInfo> ()); -228 Configuration conf = server.getConfiguration(); -229 // Only read favored nodes if using the favored nodes load balancer. -230 this.shouldAssignRegionsWithFavoredNodes = conf.getClass( -231 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( -232 FavoredNodeLoadBalancer.class); -233 -234 this.tableStateManager = tableStateManager; -235 -236 // This is the max attempts, not retries, so it should be at least 1. -237 this.maximumAttempts = Math.max(1, -238 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); -239 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( -240 "hbase.meta.assignment.retry.sleeptime", 1000l); -241 this.balancer = balancer; -242 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); -243 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( -244 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); -245 this.regionStates = new RegionStates( -246 server, tableStateManager, serverManager, regionStateStore); -247 -248 this.bulkAssignWaitTillAllAssigned = -249 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); -250 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); -251 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); -252 this.bulkPerRegionOpenTimeGuesstimate = -253 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); -254 -255 this.metricsAssignmentManager = new MetricsAssignmentManager(); -256 this.tableLockManager = tableLockManager; -257 } -258 -259 /** -260 * Add the listener to the notification list. -261 * @param listener The AssignmentListener to register -262 */ -263 public void registerListener(final AssignmentListener listener) { -264 this.listeners.add(listener); -265 } -266 -267 /** -268 * Remove the listener from the notification list. -269 * @param listener The AssignmentListener to unregister -270 */ -271 public boolean unregisterListener(final AssignmentListener listener) { -272 return this.listeners.remove(listener); -273 } -274 -275 /** -276 * @return Instance of ZKTableStateManager. -277 */ -278 public TableStateManager getTableStateManager() { -279 // These are 'expensive' to make involving trip to zk ensemble so allow -280 // sharing. -281 return this.tableStateManager; -282 } -283 -284 /** -285 * This SHOULD not be public. It is public now -286 * because of some unit tests. -287 * -288 * TODO: make it package private and keep RegionStates in the master package -289 */ -290 public RegionStates getRegionStates() { -291 return regionStates; -292 } -293 -294 /** -295 * Used in some tests to mock up region state in meta -296 */ -297 @VisibleForTesting -298 RegionStateStore getRegionStateStore() { -299 return regionStateStore; -300 } -301 -302 public RegionPlan getRegionReopenPlan(HRegionInfo hri) { -303 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri)); -304 } -305 -306 /** -307 * Add a regionPlan for the specified region. -308 * @param encodedName -309 * @param plan -310 */ -311 public void addPlan(String encodedName, RegionPlan plan) { -312 synchronized (regionPlans) { -313 regionPlans.put(encodedName, plan); -314 } -315 } -316 -317 /** -318 * Add a map of region plans. -319 */ -320 public void addPlans(Map<String, RegionPlan> plans) { -321 synchronized (regionPlans) { -322 regionPlans.putAll(plans); -323 } -324 } -325 -326 /** -327 * Set the list of regions that will be reopened -328 * because of an update in table schema -329 * -330 * @param regions -331 * list of regions that should be tracked for reopen -332 */ -333 public void setRegionsToReopen(List <HRegionInfo> regions) { -334 for(HRegionInfo hri : regions) { -335 regionsToReopen.put(hri.getEncodedName(), hri); -336 } -337 } -338 -339 /** -340 * Used by the client to identify if all regions have the schema updates -341 * -342 * @param tableName -343 * @return Pair indicating the status of the alter command -344 * @throws IOException -345 */ -346 public Pair<Integer, Integer> getReopenStatus(TableName tableName) -347 throws IOException { -348 List<HRegionInfo> hris; -349 if (TableName.META_TABLE_NAME.equals(tableName)) { -350 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper()); -351 } else { -352 hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true); -353 } -354 -355 Integer pending = 0; -356 for (HRegionInfo hri : hris) { -357 String name = hri.getEncodedName(); -358 // no lock concurrent access ok: sequential consistency respected. -359 if (regionsToReopen.containsKey(name) -360 || regionStates.isRegionInTransition(name)) { -361 pending++; -362 } -363 } -364 return new Pair<Integer, Integer>(pending, hris.size()); -365 } -366 -367 /** -368 * Used by ServerShutdownHandler to make sure AssignmentManager has completed -369 * the failover cleanup before re-assigning regions of dead servers. So that -370 * when re-assignment happens, AssignmentManager has proper region states. -371 */ -372 public boolean isFailoverCleanupDone() { -373 return failoverCleanupDone.get(); -374 } -375 -376 /** -377 * To avoid racing with AM, external entities may need to lock a region, -378 * for example, when SSH checks what regions to skip re-assigning. -379 */ -380 public Lock acquireRegionLock(final String encodedName) { -381 return locker.acquireLock(encodedName); -382 } -383 -384 /** -385 * Now, failover cleanup is completed. Notify server manager to -386 * process queued up dead servers processing, if any. -387 */ -388 void failoverCleanupDone() { -389 failoverCleanupDone.set(true); -390 serverManager.processQueuedDeadServers(); -391 } -392 -393 /** -394 * Called on startup. -395 * Figures whether a fresh cluster start of we are joining extant running cluster. -396 * @throws IOException -397 * @throws KeeperException -398 * @throws InterruptedException -399 * @throws CoordinatedStateException -400 */ -401 void joinCluster() -402 throws IOException, KeeperException, InterruptedException, CoordinatedStateException { -403 long startTime = System.currentTimeMillis(); -404 // Concurrency note: In the below the accesses on regionsInTransition are -405 // outside of a synchronization block where usually all accesses to RIT are -406 // synchronized. The presumption is that in this case it is safe since this -407 // method is being played by a single thread on startup. -408 -409 // TODO: Regions that have a null location and are not in regionsInTransitions -410 // need to be handled. -411 -412 // Scan hbase:meta to build list of existing regions, servers, and assignment -413 // Returns servers who have not checked in (assumed dead) that some regions -414 // were assigned to (according to the meta) -415 Set<ServerName> deadServers = rebuildUserRegions(); -416 -417 // This method will assign all user regions if a clean server startup or -418 // it will reconstruct master state and cleanup any leftovers from previous master process. -419 boolean failover = processDeadServersAndRegionsInTransition(deadServers); -420 -421 recoverTableInDisablingState(); -422 recoverTableInEnablingState(); -423 LOG.info("Joined the cluster in " + (System.currentTimeMillis() -424 - startTime) + "ms, failover=" + failover); -425 } -426 -427 /** -428 * Process all regions that are in transition in zookeeper and also -429 * processes the list of dead servers. -430 * Used by master joining an cluster. If we figure this is a clean cluster -431 * startup, will assign all user regions. -432 * @param deadServers Set of servers that are offline probably legitimately that were carrying -433 * regions according to a scan of hbase:meta. Can be null. -434 * @throws IOException -435 * @throws InterruptedException -436 */ -437 boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers) -438 throws KeeperException, IOException, InterruptedException, CoordinatedStateException { -439 // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); -440 boolean failover = !serverManager.getDeadServers().isEmpty(); -441 if (failover) { -442 // This may not be a failover actually, especially if meta is on this master. -443 if (LOG.isDebugEnabled()) { -444 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers()); -445 } -446 } else { -447 // If any one region except meta is assigned, it's a failover. -448 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); -449 for (Map.Entry<HRegionInfo, ServerName> en: -450 regionStates.getRegionAssignments().entrySet()) { -451 HRegionInfo hri = en.getKey(); -452 if (!hri.isMetaTable() -453 && onlineServers.contains(en.getValue())) { -454 LOG.debug("Found " + hri + " out on cluster"); -455 failover = true; -456 break; -457 } -458 } -459 if (!failover) { -460 // If any region except meta is in transition on a live server, it's a failover. -461 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); -462 if (!regionsInTransition.isEmpty()) { -463 for (RegionState regionState: regionsInTransition.values()) { -464 ServerName serverName = regionState.getServerName(); -465 if (!regionState.getRegion().isMetaRegion() -466 && serverName != null && onlineServers.contains(serverName)) { -467 LOG.debug("Found " + regionState + " in RITs"); -468 failover = true; -469 break; -470 } -471 } -472 } -473 } -474 } -475 if (!failover) { -476 // If we get here, we have a full cluster restart. It is a failover only -477 // if there are some WALs are not split yet. For meta WALs, they should have -478 // been split already, if any. We can walk through those queued dead servers, -479 // if they don't have any WALs, this restart should be considered as a clean one -480 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet(); -481 if (!queuedDeadServers.isEmpty()) { -482 Configuration conf = server.getConfiguration(); -483 Path rootdir = FSUtils.getRootDir(conf); -484 FileSystem fs = rootdir.getFileSystem(conf); -485 for (ServerName serverName: queuedDeadServers) { -486 // In the case of a clean exit, the shutdown handler would have presplit any WALs and -487 // removed empty directories. -488 Path logDir = new Path(rootdir, -489 DefaultWALProvider.getWALDirectoryName(serverName.toString())); -490 Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT); -491 if (fs.exists(logDir) || fs.exists(splitDir)) { -492 LOG.debug("Found queued dead server " + serverName); -493 failover = true; -494 break; -495 } -496 } -497 if (!failover) { -498 // We figured that it's not a failover, so no need to -499 // work on these re-queued dead servers any more. -500 LOG.info("AM figured that it's not a failover and cleaned up " -501 + queuedDeadServers.size() + " queued dead servers"); -502 serverManager.removeRequeuedDeadServers(); -503 } -504 } -505 } -506 -507 Set<TableName> disabledOrDisablingOrEnabling = null; -508 Map<HRegionInfo, ServerName> allRegions = null; -509 -510 if (!failover) { -511 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( -512 TableState.State.DISABLED, TableState.State.DISABLING, -513 TableState.State.ENABLING); -514 -515 // Clean re/start, mark all user regions closed before reassignment -516 allRegions = regionStates.closeAllUserRegions( -517 disabledOrDisablingOrEnabling); +021import com.google.common.annotations.VisibleForTesting; +022 +023import java.io.IOException; +024import java.util.ArrayList; +025import java.util.Collection; +026import java.util.Collections; +027import java.util.HashMap; +028import java.util.HashSet; +029import java.util.Iterator; +030import java.util.List; +031import java.util.Map; +032import java.util.NavigableMap; +033import java.util.Random; +034import java.util.Set; +035import java.util.TreeMap; +036import java.util.concurrent.Callable; +037import java.util.concurrent.ConcurrentHashMap; +038import java.util.concurrent.CopyOnWriteArrayList; +039import java.util.concurrent.TimeUnit; +040import java.util.concurrent.atomic.AtomicBoolean; +041import java.util.concurrent.atomic.AtomicInteger; +042import java.util.concurrent.locks.Lock; +043import java.util.concurrent.locks.ReentrantLock; +044 +045import org.apache.commons.logging.Log; +046import org.apache.commons.logging.LogFactory; +047import org.apache.hadoop.hbase.classification.InterfaceAudience; +048import org.apache.hadoop.conf.Configuration; +049import org.apache.hadoop.fs.FileStatus; +050import org.apache.hadoop.fs.FileSystem; +051import org.apache.hadoop.fs.Path; +052import org.apache.hadoop.hbase.CoordinatedStateException; +053import org.apache.hadoop.hbase.HBaseIOException; +054import org.apache.hadoop.hbase.HConstants; +055import org.apache.hadoop.hbase.HRegionInfo; +056import org.apache.hadoop.hbase.HRegionLocation; +057import org.apache.hadoop.hbase.HTableDescriptor; +058import org.apache.hadoop.hbase.MetaTableAccessor; +059import org.apache.hadoop.hbase.NotServingRegionException; +060import org.apache.hadoop.hbase.RegionLocations; +061import org.apache.hadoop.hbase.RegionStateListener; +062import org.apache.hadoop.hbase.ServerName; +063import org.apache.hadoop.hbase.TableName; +064import org.apache.hadoop.hbase.TableNotFoundException; +065import org.apache.hadoop.hbase.client.RegionReplicaUtil; +066import org.apache.hadoop.hbase.client.Result; +067import org.apache.hadoop.hbase.client.TableState; +068import org.apache.hadoop.hbase.executor.EventHandler; +069import org.apache.hadoop.hbase.executor.EventType; +070import org.apache.hadoop.hbase.executor.ExecutorService; +071import org.apache.hadoop.hbase.ipc.FailedServerException; +072import org.apache.hadoop.hbase.ipc.RpcClient; +073import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +074import org.apache.hadoop.hbase.master.RegionState.State; +075import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; +076import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; +077import org.apache.hadoop.hbase.master.handler.DisableTableHandler; +078import org.apache.hadoop.hbase.master.handler.EnableTableHandler; +079import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; +080import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +081import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +082import org.apache.hadoop.hbase.quotas.QuotaExceededException; +083import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +084import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +085import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +086import org.apache.hadoop.hbase.wal.DefaultWALProvider; +087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +088import org.apache.hadoop.hbase.util.FSUtils; +089import org.apache.hadoop.hbase.util.KeyLocker; +090import org.apache.hadoop.hbase.util.Pair; +091import org.apache.hadoop.hbase.util.PairOfSameType; +092import org.apache.hadoop.hbase.util.Threads; +093import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +094import org.apache.hadoop.ipc.RemoteException; +095import org.apache.hadoop.util.StringUtils; +096import org.apache.zookeeper.KeeperException; +097 +098/** +099 * Manages and performs region assignment. +100 * Related communications with regionserver are all done over RPC. +101 */ +102@InterfaceAudience.Private +103public class AssignmentManager { +104 private static final Log LOG = LogFactory.getLog(AssignmentManager.class); +105 +106 protected final MasterServices server; +107 +108 private ServerManager serverManager; +109 +110 private boolean shouldAssignRegionsWithFavoredNodes; +111 +112 private LoadBalancer balancer; +113 +114 private final MetricsAssignmentManager metricsAssignmentManager; +115 +116 private final TableLockManager tableLockManager; +117 +118 private AtomicInteger numRegionsOpened = new AtomicInteger(0); +119 +120 final private KeyLocker<String> locker = new KeyLocker<String>(); +121 +122 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>()); +123 +124 /** +125 * Map of regions to reopen after the schema of a table is changed. Key - +126 * encoded region name, value - HRegionInfo +127 */ +128 private final Map <String, HRegionInfo> regionsToReopen; +129 +130 /* +131 * Maximum times we recurse an assignment/unassignment. +132 * See below in {@link #assign()} and {@link #unassign()}. +133 */ +134 private final int maximumAttempts; +135 +136 /** +137 * The sleep time for which the assignment will wait before retrying in case of +138 * hbase:meta assignment failure due to lack of availability of region plan or bad region plan +139 */ +140 private final long sleepTimeBeforeRetryingMetaAssignment; +141 +142 /** Plans for region movement. Key is the encoded version of a region name*/ +143 // TODO: When do plans get cleaned out? Ever? In server open and in server +144 // shutdown processing -- St.Ack +145 // All access to this Map must be synchronized. +146 final NavigableMap<String, RegionPlan> regionPlans = +147 new TreeMap<String, RegionPlan>(); +148 +149 private final TableStateManager tableStateManager; +150 +151 private final ExecutorService executorService; +152 +153 // Thread pool executor service. TODO, consolidate with executorService? +154 private java.util.concurrent.ExecutorService threadPoolExecutorService; +155 +156 private final RegionStates regionStates; +157 +158 // The threshold to use bulk assigning. Using bulk assignment +159 // only if assigning at least this many regions to at least this +160 // many servers. If assigning fewer regions to fewer servers, +161 // bulk assigning may be not as efficient. +162 private final int bulkAssignThresholdRegions; +163 private final int bulkAssignThresholdServers; +164 private final int bulkPerRegionOpenTimeGuesstimate; +165 +166 // Should bulk assignment wait till all regions are assigned, +167 // or it is timed out? This is useful to measure bulk assignment +168 // performance, but not needed in most use cases. +169 private final boolean bulkAssignWaitTillAllAssigned; +170 +171 /** +172 * Indicator that AssignmentManager has recovered the region states so +173 * that ServerShutdownHandler can be fully enabled and re-assign regions +174 * of dead servers. So that when re-assignment happens, AssignmentManager +175 * has proper region states. +176 * +177 * Protected to ease testing. +178 */ +179 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); +180 +181 /** +182 * A map to track the count a region fails to open in a row. +183 * So that we don't try to open a region forever if the failure is +184 * unrecoverable. We don't put this information in region states +185 * because we don't expect this to happen frequently; we don't +186 * want to copy this information over during each state transition either. +187 */ +188 private final ConcurrentHashMap<String, AtomicInteger> +189 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>(); +190 +191 // In case not using ZK for region assignment, region states +192 // are persisted in meta with a state store +193 private final RegionStateStore regionStateStore; +194 +195 /** +196 * For testing only! Set to true to skip handling of split. +197 */ +198 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") +199 public static boolean TEST_SKIP_SPLIT_HANDLING = false; +200 +201 /** Listeners that are called on assignment events. */ +202 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>(); +203 +204 private RegionStateListener regionStateListener; +205 +206 /** +207 * Constructs a new assignment manager. +208 * +209 * @param server instance of HMaster this AM running inside +210 * @param serverManager serverManager for associated HMaster +211 * @param balancer implementation of {@link LoadBalancer} +212 * @param service Executor service +213 * @param metricsMaster metrics manager +214 * @param tableLockManager TableLock manager +215 * @throws IOException +216 */ +217 public AssignmentManager(MasterServices server, ServerManager serverManager, +218 final LoadBalancer balancer, +219 final ExecutorService service, MetricsMaster metricsMaster, +220 final TableLockManager tableLockManager, +221 final TableStateManager tableStateManager) +222 throws IOException { +223 this.server = server; +224 this.serverManager = serverManager; +225 this.executorService = service; +226 this.regionStateStore = new RegionStateStore(server); +227 this.regionsToReopen = Collections.synchronizedMap +228 (new HashMap<String, HRegionInfo> ()); +229 Configuration conf = server.getConfiguration(); +230 // Only read favored nodes if using the favored nodes load balancer. +231 this.shouldAssignRegionsWithFavoredNodes = conf.getClass( +232 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( +233 FavoredNodeLoadBalancer.class); +234 +235 this.tableStateManager = tableStateManager; +236 +237 // This is the max attempts, not retries, so it should be at least 1. +238 this.maximumAttempts = Math.max(1, +239 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); +240 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( +241 "hbase.meta.assignment.retry.sleeptime", 1000l); +242 this.balancer = balancer; +243 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); +244 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( +245 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); +246 this.regionStates = new RegionStates( +247 server, tableStateManager, serverManager, regionStateStore); +248 +249 this.bulkAssignWaitTillAllAssigned = +250 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); +251 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); +252 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); +253 this.bulkPerRegionOpenTimeGuesstimate = +254 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); +255 +256 this.metricsAssignmentManager = new MetricsAssignmentManager(); +257 this.tableLockManager = tableLockManager; +258 } +259 +260 /** +261 * Add the listener to the notification list. +262 * @param listener The AssignmentListener to register +263 */ +264 public void registerListener(final AssignmentListener listener) { +265 this.listeners.add(listener); +266 } +267 +268 /** +269 * Remove the listener from the notification list. +270 * @param listener The AssignmentListener to unregister +271 */ +272 public boolean unregisterListener(final AssignmentListener listener) { +273 return this.listeners.remove(listener); +274 } +275 +276 /** +277 * @return Instance of ZKTableStateManager. +278 */ +279 public TableStateManager getTableStateManager() { +280 // These are 'expensive' to make involving trip to zk ensemble so allow +281 // sharing. +282 return this.tableStateManager; +283 } +284 +285 /** +286 * This SHOULD not be public. It is public now +287 * because of some unit tests. +288 * +289 * TODO: make it package private and keep RegionStates in the master package +290 */ +291 public RegionStates getRegionStates() { +292 return regionStates; +293 } +294 +295 /** +296 * Used in some tests to mock up region state in meta +297 */ +298 @VisibleForTesting +299 RegionStateStore getRegionStateStore() { +300 return regionStateStore; +301 } +302 +303 public RegionPlan getRegionReopenPlan(HRegionInfo hri) { +304 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri)); +305 } +306 +307 /** +308 * Add a regionPlan for the specified region. +309 * @param encodedName +310 * @param plan +311 */ +312 public void addPlan(String encodedName, RegionPlan plan) { +313 synchronized (regionPlans) { +314 regionPlans.put(encodedName, plan); +315 } +316 } +317 +318 /** +319 * Add a map of region plans. +320 */ +321 public void addPlans(Map<String, RegionPlan> plans) { +322 synchronized (regionPlans) { +323 regionPlans.putAll(plans); +324 } +325 } +326 +327 /** +328 * Set the list of regions that will be reopened +329 * because of an update in table schema +330 * +331 * @param regions +332 * list of regions that should be tracked for reopen +333 */ +334 public void setRegionsToReopen(List <HRegionInfo> regions) { +335 for(HRegionInfo hri : regions) { +336 regionsToReopen.put(hri.getEncodedName(), hri); +337 } +338 } +339 +340 /** +341 * Used by the client to identify if all regions have the schema updates +342 * +343 * @param tableName +344 * @return Pair indicating the status of the alter command +345 * @throws IOException +346 */ +347 public Pair<Integer, Integer> getReopenStatus(TableName tableName) +348 throws IOException { +349 List<HRegionInfo> hris; +350 if (TableName.META_TABLE_NAME.equals(tableName)) { +351 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper()); +352 } else { +353 hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true); +354 } +355 +356 Integer pending = 0; +357 for (HRegionInfo hri : hris) { +358 String name = hri.getEncodedName(); +359 // no lock concurrent access ok: sequential consistency respected. +360 if (regionsToReopen.containsKey(name) +361 || regionStates.isRegionInTransition(name)) { +362 pending++; +363 } +364 } +365 return new Pair<Integer, Integer>(pending, hris.size()); +366 } +367 +368 /** +369 * Used by ServerShutdownHandler to make sure AssignmentManager has completed +370 * the failover cleanup before re-assigning regions of dead servers. So that +371 * when re-assignment happens, AssignmentManager has proper region states. +372 */ +373 public boolean isFailoverCleanupDone() { +374 return failoverCleanupDone.get(); +375 } +376 +377 /** +378 * To avoid racing with AM, external entities may need to lock a region, +379 * for example, when SSH checks what regions to skip re-assigning. +380 */ +381 public Lock acquireRegionLock(final String encodedName) { +382 return locker.acquireLock(encodedName); +383 } +384 +385 /** +386 * Now, failover cleanup is completed. Notify server manager to +387 * process queued up dead servers processing, if any. +388 */ +389 void failoverCleanupDone() { +390 failoverCleanupDone.set(true); +391 serverManager.processQueuedDeadServers(); +392 } +393 +394 /** +395 * Called on startup. +396 * Figures whether a fresh cluster start of we are joining extant running cluster. +397 * @throws IOException +398 * @throws KeeperException +399 * @throws InterruptedException +400 * @throws CoordinatedStateException +401 */ +402 void joinCluster() +403 throws IOException, KeeperException, InterruptedException, CoordinatedStateException { +404 long startTime = System.currentTimeMillis(); +405 // Concurrency note: In the below the accesses on regionsInTransition are +406 // outside of a synchronization block where usually all acces