hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject hbase git commit: HBASE-16570 Compute region locality in parallel at startup (addendum)
Date Thu, 10 Nov 2016 08:54:11 GMT
Repository: hbase
Updated Branches:
  refs/heads/master e5a288e5c -> 7f08cd0e1


HBASE-16570 Compute region locality in parallel at startup (addendum)

Addendum mainly for:
1. Avoid interfering with block location cache in RegionLocationFinder
2. Avoid refreshing block lcoations during HMaster startup (or else the startup could be really
slow)

Signed-off-by: Yu Li <liyu@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f08cd0e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f08cd0e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f08cd0e

Branch: refs/heads/master
Commit: 7f08cd0e104f386881f21c4e7148b3b969d8b675
Parents: e5a288e
Author: binlijin <binlijin@gmail.com>
Authored: Thu Nov 10 16:47:31 2016 +0800
Committer: Yu Li <liyu@apache.org>
Committed: Thu Nov 10 16:53:07 2016 +0800

----------------------------------------------------------------------
 .../hbase/master/balancer/BaseLoadBalancer.java | 70 ++++++------------
 .../master/balancer/RegionLocationFinder.java   | 78 +++++++++++++-------
 .../master/balancer/TestBaseLoadBalancer.java   | 51 ++-----------
 .../balancer/TestRegionLocationFinder.java      | 26 ++++++-
 4 files changed, 106 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7f08cd0e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 2b13b21..f71f8f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -33,7 +33,6 @@ import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -60,7 +59,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * The base class for load balancers. It provides the the functions used to by
@@ -119,7 +117,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     HRegionInfo[] regions;
     Deque<RegionLoad>[] regionLoads;
     private RegionLocationFinder regionFinder;
-    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
 
     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
 
@@ -169,8 +166,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         Map<String, Deque<RegionLoad>> loads,
         RegionLocationFinder regionFinder,
         RackManager rackManager) {
-      this(null, clusterState, loads, regionFinder,
-        rackManager);
+      this(null, clusterState, loads, regionFinder, rackManager);
     }
 
     @SuppressWarnings("unchecked")
@@ -241,13 +237,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       regionIndexToTableIndex = new int[numRegions];
       regionIndexToPrimaryIndex = new int[numRegions];
       regionLoads = new Deque[numRegions];
-      regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
-          numRegions);
-      if (regionFinder != null) {
-        for (int i = 0; i < numRegions; i++) {
-          regionLocationFutures.add(null);
-        }
-      }
+
       regionLocations = new int[numRegions][];
       serverIndicesSortedByRegionCount = new Integer[numServers];
       serverIndicesSortedByLocality = new Integer[numServers];
@@ -307,43 +297,16 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
         for (HRegionInfo region : entry.getValue()) {
           registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
-
           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
           regionIndex++;
         }
       }
+
       for (HRegionInfo region : unassignedRegions) {
         registerRegion(region, regionIndex, -1, loads, regionFinder);
         regionIndex++;
       }
 
-      if (regionFinder != null) {
-        for (int index = 0; index < regionLocationFutures.size(); index++) {
-          ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
-              .get(index);
-          HDFSBlocksDistribution blockDistbn = null;
-          try {
-            blockDistbn = future.get();
-          } catch (InterruptedException ite) {
-          } catch (ExecutionException ee) {
-            LOG.debug(
-                "IOException during HDFSBlocksDistribution computation. for region = "
-                    + regions[index].getEncodedName(), ee);
-          } finally {
-            if (blockDistbn == null) {
-              blockDistbn = new HDFSBlocksDistribution();
-            }
-          }
-          List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
-          regionLocations[index] = new int[loc.size()];
-          for (int i = 0; i < loc.size(); i++) {
-            regionLocations[index][i] = loc.get(i) == null ? -1
-                : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
-                    : serversToIndex.get(loc.get(i).getHostAndPort()));
-          }
-        }
-      }
-
       for (int i = 0; i < serversPerHostList.size(); i++) {
         serversPerHost[i] = new int[serversPerHostList.get(i).size()];
         for (int j = 0; j < serversPerHost[i].length; j++) {
@@ -464,8 +427,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     }
 
     /** Helper for Cluster constructor to handle a region */
-    private void registerRegion(HRegionInfo region, int regionIndex, int serverIndex,
-        Map<String, Deque<RegionLoad>> loads, RegionLocationFinder regionFinder)
{
+    private void registerRegion(HRegionInfo region, int regionIndex,
+        int serverIndex, Map<String, Deque<RegionLoad>> loads,
+        RegionLocationFinder regionFinder) {
       String tableName = region.getTable().getNameAsString();
       if (!tablesToIndex.containsKey(tableName)) {
         tables.add(tableName);
@@ -492,8 +456,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
       if (regionFinder != null) {
         // region location
-        regionLocationFutures.set(regionIndex,
-            regionFinder.asyncGetBlockDistribution(region));
+        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
+        regionLocations[regionIndex] = new int[loc.size()];
+        for (int i = 0; i < loc.size(); i++) {
+          regionLocations[regionIndex][i] = loc.get(i) == null ? -1
+              : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
+                  : serversToIndex.get(loc.get(i).getHostAndPort()));
+        }
       }
     }
 
@@ -1277,7 +1246,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       return assignments;
     }
 
-    Cluster cluster = createCluster(servers, regions);
+    Cluster cluster = createCluster(servers, regions, false);
     List<HRegionInfo> unassignedRegions = new ArrayList<HRegionInfo>();
 
     roundRobinAssignment(cluster, regions, unassignedRegions,
@@ -1324,7 +1293,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
   }
 
   protected Cluster createCluster(List<ServerName> servers,
-      Collection<HRegionInfo> regions) {
+      Collection<HRegionInfo> regions, boolean forceRefresh) {
+    if (forceRefresh == true) {
+      regionFinder.refreshAndWait(regions);
+    }
     // Get the snapshot of the current assignments for the regions in question, and then
create
     // a cluster out of it. Note that we might have replicas already assigned to some servers
     // earlier. So we want to get the snapshot to see those assignments, but this will only
contain
@@ -1337,7 +1309,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       }
     }
     return new Cluster(regions, clusterState, null, this.regionFinder,
-      rackManager);
+        rackManager);
   }
 
   /**
@@ -1365,7 +1337,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     }
 
     List<HRegionInfo> regions = Lists.newArrayList(regionInfo);
-    Cluster cluster = createCluster(servers, regions);
+    Cluster cluster = createCluster(servers, regions, false);
     return randomAssignment(cluster, regionInfo, servers);
   }
 
@@ -1440,7 +1412,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     int numRandomAssignments = 0;
     int numRetainedAssigments = 0;
 
-    Cluster cluster = createCluster(servers, regions.keySet());
+    Cluster cluster = createCluster(servers, regions.keySet(), true);
 
     for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet()) {
       HRegionInfo region = entry.getKey();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f08cd0e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index fbe57d0..d5edfab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -17,15 +17,17 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,17 +45,15 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * This will find where data for a region is located in HDFS. It ranks
@@ -70,7 +70,8 @@ class RegionLocationFinder {
   private volatile ClusterStatus status;
   private MasterServices services;
   private final ListeningExecutorService executor;
-  private long lastFullRefresh = 0;
+  // Do not scheduleFullRefresh at master startup
+  private long lastFullRefresh = EnvironmentEdgeManager.currentTime();
 
   private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
       new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
@@ -167,9 +168,8 @@ class RegionLocationFinder {
     return includesUserTables;
   }
 
-  protected List<ServerName> getTopBlockLocations(
-      HDFSBlocksDistribution blocksDistribution) {
-    List<String> topHosts = blocksDistribution.getTopHosts();
+  protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
+    List<String> topHosts = getBlockDistribution(region).getTopHosts();
     return mapHostNameToServerName(topHosts);
   }
 
@@ -299,7 +299,7 @@ class RegionLocationFinder {
     }
   }
 
-  public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
+  private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
       HRegionInfo hri) {
     try {
       return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
@@ -307,4 +307,32 @@ class RegionLocationFinder {
       return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
     }
   }
+
+  public void refreshAndWait(Collection<HRegionInfo> hris) {
+    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures
=
+        new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(hris.size());
+    for (HRegionInfo hregionInfo : hris) {
+      regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
+    }
+    int index = 0;
+    for (HRegionInfo hregionInfo : hris) {
+      ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
+          .get(index);
+      try {
+        cache.put(hregionInfo, future.get());
+      } catch (InterruptedException ite) {
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException ee) {
+        LOG.debug(
+            "ExecutionException during HDFSBlocksDistribution computation. for region = "
+                + hregionInfo.getEncodedName(), ee);
+      }
+      index++;
+    }
+  }
+
+  // For test
+  LoadingCache<HRegionInfo, HDFSBlocksDistribution> getCache() {
+    return cache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f08cd0e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index 37165d3..b0b0a2b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -57,8 +56,6 @@ import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 
 @Category({MasterTests.class, MediumTests.class})
 public class TestBaseLoadBalancer extends BalancerTestBase {
@@ -451,49 +448,17 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
 
     // mock block locality for some regions
     RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
-    HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
-    ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
-        .immediateFuture(emptyBlockDistribution);
-    for (HRegionInfo regionInfo : regions) {
-      when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
-          defaultFuture);
-    }
     // block locality: region:0   => {server:0}
     //                 region:1   => {server:0, server:1}
     //                 region:42 => {server:4, server:9, server:5}
-    HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution();
-    ListenableFuture<HDFSBlocksDistribution> future0 = Futures
-        .immediateFuture(region0BlockDistribution);
-    when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn(
-        future0);
-    when(locationFinder.getTopBlockLocations(region0BlockDistribution))
-        .thenReturn(Lists.newArrayList(servers.get(0)));
-
-    HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
-    ListenableFuture<HDFSBlocksDistribution> future1 = Futures
-        .immediateFuture(region1BlockDistribution);
-    when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
-        future1);
-    when(locationFinder.getTopBlockLocations(region1BlockDistribution))
-        .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
-
-    HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
-    ListenableFuture<HDFSBlocksDistribution> future42 = Futures
-        .immediateFuture(region42BlockDistribution);
-    when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
-        future42);
-    when(locationFinder.getTopBlockLocations(region42BlockDistribution))
-        .thenReturn(
-            Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
-
-    HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
-    ListenableFuture<HDFSBlocksDistribution> future43 = Futures
-        .immediateFuture(region43BlockDistribution);
-    when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
-        future43);
-    // this server does not exists in clusterStatus
-    when(locationFinder.getTopBlockLocations(region43BlockDistribution))
-        .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
+    when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
+        Lists.newArrayList(servers.get(0)));
+    when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
+        Lists.newArrayList(servers.get(0), servers.get(1)));
+    when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
+        Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
+    when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
+        Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists
in clusterStatus
 
     BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f08cd0e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
index 039cac1..f18d722 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.balancer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -121,8 +123,8 @@ public class TestRegionLocationFinder {
     for (int i = 0; i < ServerNum; i++) {
       HRegionServer server = cluster.getRegionServer(i);
       for (Region region : server.getOnlineRegions(tableName)) {
-        List<ServerName> servers = finder.getTopBlockLocations(finder
-            .getBlockDistribution(region.getRegionInfo()));
+        List<ServerName> servers = finder.getTopBlockLocations(region
+            .getRegionInfo());
         // test table may have empty region
         if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
           continue;
@@ -139,4 +141,24 @@ public class TestRegionLocationFinder {
       }
     }
   }
+
+  @Test
+  public void testRefreshAndWait() throws Exception {
+    finder.getCache().invalidateAll();
+    for (int i = 0; i < ServerNum; i++) {
+      HRegionServer server = cluster.getRegionServer(i);
+      List<Region> regions = server.getOnlineRegions(tableName);
+      if (regions.size() <= 0) {
+        continue;
+      }
+      List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(regions.size());
+      for (Region region : regions) {
+        regionInfos.add(region.getRegionInfo());
+      }
+      finder.refreshAndWait(regionInfos);
+      for (HRegionInfo regionInfo : regionInfos) {
+        assertNotNull(finder.getCache().getIfPresent(regionInfo));
+      }
+    }
+  }
 }


Mime
View raw message