Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5E608200CDF for ; Thu, 17 Aug 2017 17:07:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5CCF416B120; Thu, 17 Aug 2017 15:07:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0770B16B115 for ; Thu, 17 Aug 2017 17:07:15 +0200 (CEST) Received: (qmail 57827 invoked by uid 500); 17 Aug 2017 15:07:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 56465 invoked by uid 99); 17 Aug 2017 15:07:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2017 15:07:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A21B5F5ED6; Thu, 17 Aug 2017 15:07:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 17 Aug 2017 15:07:32 -0000 Message-Id: In-Reply-To: <601ea7b1c2af411cace069fd82c897fe@git.apache.org> References: <601ea7b1c2af411cace069fd82c897fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/51] [partial] hbase-site git commit: Published site at . archived-at: Thu, 17 Aug 2017 15:07:18 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/33bc9e06/devapidocs/src-html/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.Cluster.AssignRegionAction.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.Cluster.AssignRegionAction.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.Cluster.AssignRegionAction.html index d9c59a0..13f64df 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.Cluster.AssignRegionAction.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.Cluster.AssignRegionAction.html @@ -69,1532 +69,1492 @@ 061import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap; 062import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; 063import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; -064 -065/** -066 * The base class for load balancers. It provides the the functions used to by -067 * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions -068 * in the edge cases. It doesn't provide an implementation of the -069 * actual balancing algorithm. -070 * -071 */ -072public abstract class BaseLoadBalancer implements LoadBalancer { -073 protected static final int MIN_SERVER_BALANCE = 2; -074 private volatile boolean stopped = false; -075 -076 private static final List<HRegionInfo> EMPTY_REGION_LIST = new ArrayList<>(0); -077 -078 static final Predicate<ServerLoad> IDLE_SERVER_PREDICATOR -079 = load -> load.getNumberOfRegions() == 0; -080 -081 protected final RegionLocationFinder regionFinder = new RegionLocationFinder(); -082 -083 private static class DefaultRackManager extends RackManager { -084 @Override -085 public String getRack(ServerName server) { -086 return UNKNOWN_RACK; -087 } -088 } -089 -090 /** -091 * The constructor that uses the basic MetricsBalancer -092 */ -093 protected BaseLoadBalancer() { -094 metricsBalancer = new MetricsBalancer(); -095 } -096 -097 /** -098 * This Constructor accepts an instance of MetricsBalancer, -099 * which will be used instead of creating a new one -100 */ -101 protected BaseLoadBalancer(MetricsBalancer metricsBalancer) { -102 this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer(); -103 } -104 -105 /** -106 * An efficient array based implementation similar to ClusterState for keeping -107 * the status of the cluster in terms of region assignment and distribution. -108 * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of -109 * hundreds of thousands of hashmap manipulations are very costly, which is why this -110 * class uses mostly indexes and arrays. -111 * -112 * Cluster tracks a list of unassigned regions, region assignments, and the server -113 * topology in terms of server names, hostnames and racks. -114 */ -115 protected static class Cluster { -116 ServerName[] servers; -117 String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host -118 String[] racks; -119 boolean multiServersPerHost = false; // whether or not any host has more than one server -120 -121 ArrayList<String> tables; -122 HRegionInfo[] regions; -123 Deque<BalancerRegionLoad>[] regionLoads; -124 private RegionLocationFinder regionFinder; -125 -126 int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality -127 -128 int[] serverIndexToHostIndex; //serverIndex -> host index -129 int[] serverIndexToRackIndex; //serverIndex -> rack index -130 -131 int[][] regionsPerServer; //serverIndex -> region list -132 int[][] regionsPerHost; //hostIndex -> list of regions -133 int[][] regionsPerRack; //rackIndex -> region list -134 int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index -135 int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index -136 int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index -137 -138 int[][] serversPerHost; //hostIndex -> list of server indexes -139 int[][] serversPerRack; //rackIndex -> list of server indexes -140 int[] regionIndexToServerIndex; //regionIndex -> serverIndex -141 int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) -142 int[] regionIndexToTableIndex; //regionIndex -> tableIndex -143 int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions -144 int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS -145 int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary -146 boolean hasRegionReplicas = false; //whether there is regions with replicas -147 -148 Integer[] serverIndicesSortedByRegionCount; -149 Integer[] serverIndicesSortedByLocality; -150 -151 Map<String, Integer> serversToIndex; -152 Map<String, Integer> hostsToIndex; -153 Map<String, Integer> racksToIndex; -154 Map<String, Integer> tablesToIndex; -155 Map<HRegionInfo, Integer> regionsToIndex; -156 float[] localityPerServer; -157 -158 int numServers; -159 int numHosts; -160 int numRacks; -161 int numTables; -162 int numRegions; -163 -164 int numMovedRegions = 0; //num moved regions from the initial configuration -165 Map<ServerName, List<HRegionInfo>> clusterState; -166 -167 protected final RackManager rackManager; -168 // Maps region -> rackIndex -> locality of region on rack -169 private float[][] rackLocalities; -170 // Maps localityType -> region -> [server|rack]Index with highest locality -171 private int[][] regionsToMostLocalEntities; -172 -173 protected Cluster( -174 Map<ServerName, List<HRegionInfo>> clusterState, -175 Map<String, Deque<BalancerRegionLoad>> loads, -176 RegionLocationFinder regionFinder, -177 RackManager rackManager) { -178 this(null, clusterState, loads, regionFinder, rackManager); -179 } -180 -181 @SuppressWarnings("unchecked") -182 protected Cluster( -183 Collection<HRegionInfo> unassignedRegions, -184 Map<ServerName, List<HRegionInfo>> clusterState, -185 Map<String, Deque<BalancerRegionLoad>> loads, -186 RegionLocationFinder regionFinder, -187 RackManager rackManager) { -188 -189 if (unassignedRegions == null) { -190 unassignedRegions = EMPTY_REGION_LIST; -191 } -192 -193 serversToIndex = new HashMap<>(); -194 hostsToIndex = new HashMap<>(); -195 racksToIndex = new HashMap<>(); -196 tablesToIndex = new HashMap<>(); -197 -198 //TODO: We should get the list of tables from master -199 tables = new ArrayList<>(); -200 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); -201 -202 numRegions = 0; -203 -204 List<List<Integer>> serversPerHostList = new ArrayList<>(); -205 List<List<Integer>> serversPerRackList = new ArrayList<>(); -206 this.clusterState = clusterState; -207 this.regionFinder = regionFinder; -208 -209 // Use servername and port as there can be dead servers in this list. We want everything with -210 // a matching hostname and port to have the same index. -211 for (ServerName sn : clusterState.keySet()) { -212 if (sn == null) { -213 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + -214 "skipping; unassigned regions?"); -215 if (LOG.isTraceEnabled()) { -216 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); -217 } -218 continue; -219 } -220 if (serversToIndex.get(sn.getAddress().toString()) == null) { -221 serversToIndex.put(sn.getHostAndPort(), numServers++); -222 } -223 if (!hostsToIndex.containsKey(sn.getHostname())) { -224 hostsToIndex.put(sn.getHostname(), numHosts++); -225 serversPerHostList.add(new ArrayList<>(1)); -226 } -227 -228 int serverIndex = serversToIndex.get(sn.getHostAndPort()); -229 int hostIndex = hostsToIndex.get(sn.getHostname()); -230 serversPerHostList.get(hostIndex).add(serverIndex); -231 -232 String rack = this.rackManager.getRack(sn); -233 if (!racksToIndex.containsKey(rack)) { -234 racksToIndex.put(rack, numRacks++); -235 serversPerRackList.add(new ArrayList<>()); -236 } -237 int rackIndex = racksToIndex.get(rack); -238 serversPerRackList.get(rackIndex).add(serverIndex); -239 } -240 -241 // Count how many regions there are. -242 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { -243 numRegions += entry.getValue().size(); -244 } -245 numRegions += unassignedRegions.size(); -246 -247 regionsToIndex = new HashMap<>(numRegions); -248 servers = new ServerName[numServers]; -249 serversPerHost = new int[numHosts][]; -250 serversPerRack = new int[numRacks][]; -251 regions = new HRegionInfo[numRegions]; -252 regionIndexToServerIndex = new int[numRegions]; -253 initialRegionIndexToServerIndex = new int[numRegions]; -254 regionIndexToTableIndex = new int[numRegions]; -255 regionIndexToPrimaryIndex = new int[numRegions]; -256 regionLoads = new Deque[numRegions]; -257 -258 regionLocations = new int[numRegions][]; -259 serverIndicesSortedByRegionCount = new Integer[numServers]; -260 serverIndicesSortedByLocality = new Integer[numServers]; -261 localityPerServer = new float[numServers]; -262 -263 serverIndexToHostIndex = new int[numServers]; -264 serverIndexToRackIndex = new int[numServers]; -265 regionsPerServer = new int[numServers][]; -266 regionsPerHost = new int[numHosts][]; -267 regionsPerRack = new int[numRacks][]; -268 primariesOfRegionsPerServer = new int[numServers][]; -269 primariesOfRegionsPerHost = new int[numHosts][]; -270 primariesOfRegionsPerRack = new int[numRacks][]; -271 -272 int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; -273 -274 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { -275 if (entry.getKey() == null) { -276 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); -277 continue; -278 } -279 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); -280 -281 // keep the servername if this is the first server name for this hostname -282 // or this servername has the newest startcode. -283 if (servers[serverIndex] == null || -284 servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { -285 servers[serverIndex] = entry.getKey(); -286 } -287 -288 if (regionsPerServer[serverIndex] != null) { -289 // there is another server with the same hostAndPort in ClusterState. -290 // allocate the array for the total size -291 regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; -292 } else { -293 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; -294 } -295 primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; -296 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; -297 serverIndicesSortedByLocality[serverIndex] = serverIndex; -298 } -299 -300 hosts = new String[numHosts]; -301 for (Entry<String, Integer> entry : hostsToIndex.entrySet()) { -302 hosts[entry.getValue()] = entry.getKey(); -303 } -304 racks = new String[numRacks]; -305 for (Entry<String, Integer> entry : racksToIndex.entrySet()) { -306 racks[entry.getValue()] = entry.getKey(); -307 } -308 -309 for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { -310 int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); -311 regionPerServerIndex = 0; -312 -313 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); -314 serverIndexToHostIndex[serverIndex] = hostIndex; -315 -316 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); -317 serverIndexToRackIndex[serverIndex] = rackIndex; -318 -319 for (HRegionInfo region : entry.getValue()) { -320 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); -321 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; -322 regionIndex++; -323 } -324 } -325 -326 for (HRegionInfo region : unassignedRegions) { -327 registerRegion(region, regionIndex, -1, loads, regionFinder); -328 regionIndex++; -329 } -330 -331 for (int i = 0; i < serversPerHostList.size(); i++) { -332 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; -333 for (int j = 0; j < serversPerHost[i].length; j++) { -334 serversPerHost[i][j] = serversPerHostList.get(i).get(j); -335 } -336 if (serversPerHost[i].length > 1) { -337 multiServersPerHost = true; -338 } -339 } -340 -341 for (int i = 0; i < serversPerRackList.size(); i++) { -342 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; -343 for (int j = 0; j < serversPerRack[i].length; j++) { -344 serversPerRack[i][j] = serversPerRackList.get(i).get(j); -345 } -346 } -347 -348 numTables = tables.size(); -349 numRegionsPerServerPerTable = new int[numServers][numTables]; -350 -351 for (int i = 0; i < numServers; i++) { -352 for (int j = 0; j < numTables; j++) { -353 numRegionsPerServerPerTable[i][j] = 0; -354 } -355 } -356 -357 for (int i=0; i < regionIndexToServerIndex.length; i++) { -358 if (regionIndexToServerIndex[i] >= 0) { -359 numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; -360 } -361 } -362 -363 numMaxRegionsPerTable = new int[numTables]; -364 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { -365 for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { -366 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { -367 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; -368 } -369 } -370 } -371 -372 for (int i = 0; i < regions.length; i ++) { -373 HRegionInfo info = regions[i]; -374 if (RegionReplicaUtil.isDefaultReplica(info)) { -375 regionIndexToPrimaryIndex[i] = i; -376 } else { -377 hasRegionReplicas = true; -378 HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); -379 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); -380 } -381 } -382 -383 for (int i = 0; i < regionsPerServer.length; i++) { -384 primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; -385 for (int j = 0; j < regionsPerServer[i].length; j++) { -386 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; -387 primariesOfRegionsPerServer[i][j] = primaryIndex; -388 } -389 // sort the regions by primaries. -390 Arrays.sort(primariesOfRegionsPerServer[i]); -391 } -392 -393 // compute regionsPerHost -394 if (multiServersPerHost) { -395 for (int i = 0 ; i < serversPerHost.length; i++) { -396 int numRegionsPerHost = 0; -397 for (int j = 0; j < serversPerHost[i].length; j++) { -398 numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; -399 } -400 regionsPerHost[i] = new int[numRegionsPerHost]; -401 primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; -402 } -403 for (int i = 0 ; i < serversPerHost.length; i++) { -404 int numRegionPerHostIndex = 0; -405 for (int j = 0; j < serversPerHost[i].length; j++) { -406 for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { -407 int region = regionsPerServer[serversPerHost[i][j]][k]; -408 regionsPerHost[i][numRegionPerHostIndex] = region; -409 int primaryIndex = regionIndexToPrimaryIndex[region]; -410 primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; -411 numRegionPerHostIndex++; -412 } -413 } -414 // sort the regions by primaries. -415 Arrays.sort(primariesOfRegionsPerHost[i]); -416 } -417 } -418 -419 // compute regionsPerRack -420 if (numRacks > 1) { -421 for (int i = 0 ; i < serversPerRack.length; i++) { -422 int numRegionsPerRack = 0; -423 for (int j = 0; j < serversPerRack[i].length; j++) { -424 numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; -425 } -426 regionsPerRack[i] = new int[numRegionsPerRack]; -427 primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; -428 } -429 -430 for (int i = 0 ; i < serversPerRack.length; i++) { -431 int numRegionPerRackIndex = 0; -432 for (int j = 0; j < serversPerRack[i].length; j++) { -433 for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { -434 int region = regionsPerServer[serversPerRack[i][j]][k]; -435 regionsPerRack[i][numRegionPerRackIndex] = region; -436 int primaryIndex = regionIndexToPrimaryIndex[region]; -437 primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; -438 numRegionPerRackIndex++; -439 } -440 } -441 // sort the regions by primaries. -442 Arrays.sort(primariesOfRegionsPerRack[i]); -443 } -444 } -445 } -446 -447 /** Helper for Cluster constructor to handle a region */ -448 private void registerRegion(HRegionInfo region, int regionIndex, -449 int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads, -450 RegionLocationFinder regionFinder) { -451 String tableName = region.getTable().getNameAsString(); -452 if (!tablesToIndex.containsKey(tableName)) { -453 tables.add(tableName); -454 tablesToIndex.put(tableName, tablesToIndex.size()); -455 } -456 int tableIndex = tablesToIndex.get(tableName); -457 -458 regionsToIndex.put(region, regionIndex); -459 regions[regionIndex] = region; -460 regionIndexToServerIndex[regionIndex] = serverIndex; -461 initialRegionIndexToServerIndex[regionIndex] = serverIndex; -462 regionIndexToTableIndex[regionIndex] = tableIndex; -463 -464 // region load -465 if (loads != null) { -466 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); -467 // That could have failed if the RegionLoad is using the other regionName -468 if (rl == null) { -469 // Try getting the region load using encoded name. -470 rl = loads.get(region.getEncodedName()); -471 } -472 regionLoads[regionIndex] = rl; -473 } -474 -475 if (regionFinder != null) { -476 // region location -477 List<ServerName> loc = regionFinder.getTopBlockLocations(region); -478 regionLocations[regionIndex] = new int[loc.size()]; -479 for (int i = 0; i < loc.size(); i++) { -480 regionLocations[regionIndex][i] = loc.get(i) == null ? -1 -481 : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1 -482 : serversToIndex.get(loc.get(i).getHostAndPort())); -483 } -484 } -485 } -486 -487 /** -488 * Returns true iff a given server has less regions than the balanced amount -489 */ -490 public boolean serverHasTooFewRegions(int server) { -491 int minLoad = this.numRegions / numServers; -492 int numRegions = getNumRegions(server); -493 return numRegions < minLoad; -494 } -495 -496 /** -497 * Retrieves and lazily initializes a field storing the locality of -498 * every region/server combination -499 */ -500 public float[][] getOrComputeRackLocalities() { -501 if (rackLocalities == null || regionsToMostLocalEntities == null) { -502 computeCachedLocalities(); -503 } -504 return rackLocalities; -505 } -506 -507 /** -508 * Lazily initializes and retrieves a mapping of region -> server for which region has -509 * the highest the locality -510 */ -511 public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { -512 if (rackLocalities == null || regionsToMostLocalEntities == null) { -513 computeCachedLocalities(); -514 } -515 return regionsToMostLocalEntities[type.ordinal()]; -516 } -517 -518 /** -519 * Looks up locality from cache of localities. Will create cache if it does -520 * not already exist. -521 */ -522 public float getOrComputeLocality(int region, int entity, LocalityType type) { -523 switch (type) { -524 case SERVER: -525 return getLocalityOfRegion(region, entity); -526 case RACK: -527 return getOrComputeRackLocalities()[region][entity]; -528 default: -529 throw new IllegalArgumentException("Unsupported LocalityType: " + type); -530 } -531 } -532 -533 /** -534 * Returns locality weighted by region size in MB. Will create locality cache -535 * if it does not already exist. -536 */ -537 public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { -538 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); -539 } -540 -541 /** -542 * Returns the size in MB from the most recent RegionLoad for region -543 */ -544 public int getRegionSizeMB(int region) { -545 Deque<BalancerRegionLoad> load = regionLoads[region]; -546 // This means regions have no actual data on disk -547 if (load == null) { -548 return 0; -549 } -550 return regionLoads[region].getLast().getStorefileSizeMB(); -551 } -552 -553 /** -554 * Computes and caches the locality for each region/rack combinations, -555 * as well as storing a mapping of region -> server and region -> rack such that server -556 * and rack have the highest locality for region -557 */ -558 private void computeCachedLocalities() { -559 rackLocalities = new float[numRegions][numServers]; -560 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; -561 -562 // Compute localities and find most local server per region -563 for (int region = 0; region < numRegions; region++) { -564 int serverWithBestLocality = 0; -565 float bestLocalityForRegion = 0; -566 for (int server = 0; server < numServers; server++) { -567 // Aggregate per-rack locality -568 float locality = getLocalityOfRegion(region, server); -569 int rack = serverIndexToRackIndex[server]; -570 int numServersInRack = serversPerRack[rack].length; -571 rackLocalities[region][rack] += locality / numServersInRack; -572 -573 if (locality > bestLocalityForRegion) { -574 serverWithBestLocality = server; -575 bestLocalityForRegion = locality; -576 } -577 } -578 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; -579 -580 // Find most local rack per region -581 int rackWithBestLocality = 0; -582 float bestRackLocalityForRegion = 0.0f; -583 for (int rack = 0; rack < numRacks; rack++) { -584 float rackLocality = rackLocalities[region][rack]; -585 if (rackLocality > bestRackLocalityForRegion) { -586 bestRackLocalityForRegion = rackLocality; -587 rackWithBestLocality = rack; -588 } -589 } -590 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; -591 } -592 -593 } -594 -595 /** -596 * Maps region index to rack index -597 */ -598 public int getRackForRegion(int region) { -599 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; -600 } -601 -602 enum LocalityType { -603 SERVER, -604 RACK -605 } -606 -607 /** An action to move or swap a region */ -608 public static class Action { -609 public enum Type { -610 ASSIGN_REGION, -611 MOVE_REGION, -612 SWAP_REGIONS, -613 NULL, -614 } -615 -616 public Type type; -617 public Action (Type type) {this.type = type;} -618 /** Returns an Action which would undo this action */ -619 public Action undoAction() { return this; } -620 @Override -621 public String toString() { return type + ":";} -622 } -623 -624 public static class AssignRegionAction extends Action { -625 public int region; -626 public int server; -627 public AssignRegionAction(int region, int server) { -628 super(Type.ASSIGN_REGION); -629 this.region = region; -630 this.server = server; -631 } -632 @Override -633 public Action undoAction() { -634 // TODO implement this. This action is not being used by the StochasticLB for now -635 // in case it uses it, we should implement this function. -636 throw new NotImplementedException(); -637 } -638 @Override -639 public String toString() { -640 return type + ": " + region + ":" + server; -641 } -642 } -643 -644 public static class MoveRegionAction extends Action { -645 public int region; -646 public int fromServer; -647 public int toServer; -648 -649 public MoveRegionAction(int region, int fromServer, int toServer) { -650 super(Type.MOVE_REGION); -651 this.fromServer = fromServer; -652 this.region = region; -653 this.toServer = toServer; -654 } -655 @Override -656 public Action undoAction() { -657 return new MoveRegionAction (region, toServer, fromServer); -658 } -659 @Override -660 public String toString() { -661 return type + ": " + region + ":" + fromServer + " -> " + toServer; -662 } -663 } -664 -665 public static class SwapRegionsAction extends Action { -666 public int fromServer; -667 public int fromRegion; -668 public int toServer; -669 public int toRegion; -670 public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { -671 super(Type.SWAP_REGIONS); -672 this.fromServer = fromServer; -673 this.fromRegion = fromRegion; -674 this.toServer = toServer; -675 this.toRegion = toRegion; -676 } -677 @Override -678 public Action undoAction() { -679 return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); -680 } -681 @Override -682 public String toString() { -683 return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; -684 } -685 } -686 -687 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", -688 justification="Mistake. Too disruptive to change now") -689 public static final Action NullAction = new Action(Type.NULL); -690 -691 public void doAction(Action action) { -692 switch (action.type) { -693 case NULL: break; -694 case ASSIGN_REGION: -695 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings -696 assert action instanceof AssignRegionAction: action.getClass(); -697 AssignRegionAction ar = (AssignRegionAction) action; -698 regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); -699 regionMoved(ar.region, -1, ar.server); -700 break; -701 case MOVE_REGION: -702 assert action instanceof MoveRegionAction: action.getClass(); -703 MoveRegionAction mra = (MoveRegionAction) action; -704 regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); -705 regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); -706 regionMoved(mra.region, mra.fromServer, mra.toServer); -707 break; -708 case SWAP_REGIONS: -709 assert action instanceof SwapRegionsAction: action.getClass(); -710 SwapRegionsAction a = (SwapRegionsAction) action; -711 regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); -712 regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); -713 regionMoved(a.fromRegion, a.fromServer, a.toServer); -714 regionMoved(a.toRegion, a.toServer, a.fromServer); -715 break; -716 default: -717 throw new RuntimeException("Uknown action:" + action.type); -718 } -719 } -720 -721 /** -722 * Return true if the placement of region on server would lower the availability -723 * of the region in question -724 * @return true or false -725 */ -726 boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) { -727 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { -728 return false; // safeguard against race between cluster.servers and servers from LB method args -729 } -730 int server = serversToIndex.get(serverName.getHostAndPort()); -731 int region = regionsToIndex.get(regionInfo); -732 -733 int primary = regionIndexToPrimaryIndex[region]; -734 -735 // there is a subset relation for server < host < rack -736 // check server first -737 -738 if (contains(primariesOfRegionsPerServer[server], primary)) { -739 // check for whether there are other servers that we can place this region -740 for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { -741 if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { -742 return true; // meaning there is a better server -743 } -744 } -745 return false; // there is not a better server to place this -746 } -747 -748 // check host -749 if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host -750 int host = serverIndexToHostIndex[server]; -751 if (contains(primariesOfRegionsPerHost[host], primary)) { -752 // check for whether there are other hosts that we can place this region -753 for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { -754 if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { -755 return true; // meaning there is a better host -756 } -757 } -758 return false; // there is not a better host to place this -759 } -760 } -761 -762 // check rack -763 if (numRacks > 1) { -764 int rack = serverIndexToRackIndex[server]; -765 if (contains(primariesOfRegionsPerRack[rack], primary)) { -766 // check for whether there are other racks that we can place this region -767 for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { -768 if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { -769 return true; // meaning there is a better rack -770 } -771 } -772 return false; // there is not a better rack to place this -773 } -774 } -775 return false; -776 } -777 -778 void doAssignRegion(HRegionInfo regionInfo, ServerName serverName) { -779 if (!serversToIndex.containsKey(serverName.getHostAndPort())) { -780 return; -781 } -782 int server = serversToIndex.get(serverName.getHostAndPort()); -783 int region = regionsToIndex.get(regionInfo); -784 doAction(new AssignRegionAction(region, server)); -785 } -786 -787 void regionMoved(int region, int oldServer, int newServer) { -788 regionIndexToServerIndex[region] = newServer; -789 if (initialRegionIndexToServerIndex[region] == newServer) { -790 numMovedRegions--; //region moved back to original location -791 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { -792 numMovedRegions++; //region moved from original location -793 } -794 int tableIndex = regionIndexToTableIndex[region]; -795 if (oldServer >= 0) { -796 numRegionsPerServerPerTable[oldServer][tableIndex]--; -797 } -798 numRegionsPerServerPerTable[newServer][tableIndex]++; -799 -800 //check whether this caused maxRegionsPerTable in the new Server to be updated -801 if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { -802 numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; -803 } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) -804 == numMaxRegionsPerTable[tableIndex]) { -805 //recompute maxRegionsPerTable since the previous value was coming from the old server -806 numMaxRegionsPerTable[tableIndex] = 0; -807 for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { -808 if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { -809 numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; -810 } -811 } -812 } -813 -814 // update for servers -815 int primary = regionIndexToPrimaryIndex[region]; -816 if (oldServer >= 0) { -817 primariesOfRegionsPerServer[oldServer] = removeRegion( -818 primariesOfRegionsPerServer[oldServer], primary); -819 } -820 primariesOfRegionsPerServer[newServer] = addRegionSorted( -821 primariesOfRegionsPerServer[newServer], primary); -822 -823 // update for hosts -824 if (multiServersPerHost) { -825 int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; -826 int newHost = serverIndexToHostIndex[newServer]; -827 if (newHost != oldHost) { -828 regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); -829 primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); -830 if (oldHost >= 0) { -831 regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); -832 primariesOfRegionsPerHost[oldHost] = removeRegion( -833 primariesOfRegionsPerHost[oldHost], primary); // will still be sorted -834 } -835 } -836 } -837 -838 // update for racks -839 if (numRacks > 1) { -840 int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; -841 int newRack = serverIndexToRackIndex[newServer]; -842 if (newRack != oldRack) { -843 regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); -844 primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); -845 if (oldRack >= 0) { -846 regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); -847 primariesOfRegionsPerRack[oldRack] = removeRegion( -848 primariesOfRegionsPerRack[oldRack], primary); // will still be sorted -849 } -850 } -851 } -852 } -853 -854 int[] removeRegion(int[] regions, int regionIndex) { -855 //TODO: this maybe costly. Consider using linked lists -856 int[] newRegions = new int[regions.length - 1]; -857 int i = 0; -858 for (i = 0; i < regions.length; i++) { -859 if (regions[i] == regionIndex) { -860 break; -861 } -862 newRegions[i] = regions[i]; -863 } -864 System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); -865 return newRegions; -866 } -867 -868 int[] addRegion(int[] regions, int regionIndex) { -869 int[] newRegions = new int[regions.length + 1]; -870 System.arraycopy(regions, 0, newRegions, 0, regions.length); -871 newRegions[newRegions.length - 1] = regionIndex; -872 return newRegions; -873 } -874 -875 int[] addRegionSorted(int[] regions, int regionIndex) { -876 int[] newRegions = new int[regions.length + 1]; -877 int i = 0; -878 for (i = 0; i < regions.length; i++) { // find the index to insert -879 if (regions[i] > regionIndex) { -880 break; -881 } -882 } -883 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half -884 System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half -885 newRegions[i] = regionIndex; -886 -887 return newRegions; -888 } -889 -890 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { -891 int i = 0; -892 for (i = 0; i < regions.length; i++) { -893 if (regions[i] == regionIndex) { -894 regions[i] = newRegionIndex; -895 break; -896 } -897 } -898 return regions; -899 } -900 -901 void sortServersByRegionCount() { -902 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); -903 } -904 -905 int getNumRegions(int server) { -906 return regionsPerServer[server].length; -907 } -908 -909 boolean contains(int[] arr, int val) { -910 return Arrays.binarySearch(arr, val) >= 0; -911 } -912 -913 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); -914 -915 int getLowestLocalityRegionOnServer(int serverIndex) { -916 if (regionFinder != null) { -917 float lowestLocality = 1.0f; -918 int lowestLocalityRegionIndex = -1; -919 if (regionsPerServer[serverIndex].length == 0) { -920 // No regions on that region server -921 return -1; -922 } -923 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { -924 int regionIndex = regionsPerServer[serverIndex][j]; -925 HDFSBlocksDistribution distribution = regionFinder -926 .getBlockDistribution(regions[regionIndex]); -927 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); -928 // skip empty region -929 if (distribution.getUniqueBlocksTotalWeight() == 0) { -930 continue; -931 }