hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [20/24] hbase git commit: HBASE-15482 Provide an option to skip calculating block locations for SnapshotInputFormat
Date Wed, 20 Dec 2017 01:30:27 GMT
HBASE-15482 Provide an option to skip calculating block locations for SnapshotInputFormat

Signed-off-by: tedyu <yuzhihong@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5e7d16a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5e7d16a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5e7d16a3

Branch: refs/heads/HBASE-19397
Commit: 5e7d16a3ceaeec5057474f9bae2d40d306f6dd8e
Parents: 70608ac
Author: Xiang Li <waterlx@gmail.com>
Authored: Thu Dec 7 01:06:33 2017 +0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Dec 19 15:52:16 2017 -0800

----------------------------------------------------------------------
 .../mapreduce/TableSnapshotInputFormatImpl.java | 85 ++++++++++++++------
 .../mapred/TestTableSnapshotInputFormat.java    | 27 ++++++-
 .../TableSnapshotInputFormatTestBase.java       |  7 +-
 .../mapreduce/TestTableSnapshotInputFormat.java | 23 +++++-
 4 files changed, 110 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5e7d16a3/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index bee4926..53eb9f4 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -70,7 +70,7 @@ public class TableSnapshotInputFormatImpl {
   // key for specifying the root dir of the restored snapshot
   protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
 
-  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */
   private static final String LOCALITY_CUTOFF_MULTIPLIER =
     "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
   private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
@@ -87,6 +87,19 @@ public class TableSnapshotInputFormatImpl {
   public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
 
   /**
+   * Whether to calculate the block location for splits. Default to true.
+   * If the computing layer runs outside of HBase cluster, the block locality does not master.
+   * Setting this value to false could skip the calculation and save some time.
+   *
+   * Set access modifier to "public" so that these could be accessed by test classes of
+   * both org.apache.hadoop.hbase.mapred
+   * and  org.apache.hadoop.hbase.mapreduce.
+   */
+  public static final String  SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =
+      "hbase.TableSnapshotInputFormat.locality.enabled";
+  public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
+
+  /**
    * Implementation class for InputSplit logic common between mapred and mapreduce.
    */
   public static class InputSplit implements Writable {
@@ -356,6 +369,9 @@ public class TableSnapshotInputFormatImpl {
 
     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
 
+    boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
+                                              SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
+
     List<InputSplit> splits = new ArrayList<>();
     for (HRegionInfo hri : regionManifests) {
       // load region descriptor
@@ -365,36 +381,42 @@ public class TableSnapshotInputFormatImpl {
         for (int i = 0; i < sp.length - 1; i++) {
           if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
                   sp[i + 1])) {
-            // compute HDFS locations from snapshot files (which will get the locations for
-            // referred hfiles)
-            List<String> hosts = getBestLocations(conf,
-                    HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+            List<String> hosts =
+                calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
 
-            int len = Math.min(3, hosts.size());
-            hosts = hosts.subList(0, len);
             Scan boundedScan = new Scan(scan);
             boundedScan.setStartRow(sp[i]);
             boundedScan.setStopRow(sp[i + 1]);
+
             splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
           }
         }
       } else {
         if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
-          hri.getStartKey(), hri.getEndKey())) {
-          // compute HDFS locations from snapshot files (which will get the locations for
-          // referred hfiles)
-          List<String> hosts = getBestLocations(conf,
-                  HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
-          int len = Math.min(3, hosts.size());
-          hosts = hosts.subList(0, len);
+            hri.getStartKey(), hri.getEndKey())) {
+          List<String> hosts =
+              calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
           splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
         }
       }
     }
 
     return splits;
+  }
 
+  /**
+   * Compute block locations for snapshot files (which will get the locations for referred
hfiles)
+   * only when localityEnabled is true.
+   */
+  private static List<String> calculateLocationsForInputSplit(Configuration conf,
+      TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled)
+      throws IOException {
+    if (localityEnabled) { // care block locality
+      return getBestLocations(conf,
+                              HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+    } else { // do not care block locality
+      return null;
+    }
   }
 
   /**
@@ -408,30 +430,41 @@ public class TableSnapshotInputFormatImpl {
    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as
the top
    * host with the best locality.
+   * Return at most numTopsAtMost locations if there are more than that.
    */
-  public static List<String> getBestLocations(
-      Configuration conf, HDFSBlocksDistribution blockDistribution) {
-    List<String> locations = new ArrayList<>(3);
-
+  private static List<String> getBestLocations(Configuration conf,
+      HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {
     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
 
-    if (hostAndWeights.length == 0) {
-      return locations;
+    if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is
+      return null;
     }
 
+    if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1
+      numTopsAtMost = 1;
+    }
+    int top = Math.min(numTopsAtMost, hostAndWeights.length);
+    List<String> locations = new ArrayList<>(top);
     HostAndWeight topHost = hostAndWeights[0];
     locations.add(topHost.getHost());
 
-    // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+    if (top == 1) { // only care about the top host
+      return locations;
+    }
+
+    // When top >= 2,
+    // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block
locality
     double cutoffMultiplier
             = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
 
     double filterWeight = topHost.getWeight() * cutoffMultiplier;
 
-    for (int i = 1; i < hostAndWeights.length; i++) {
+    for (int i = 1; i <= top - 1; i++) {
       if (hostAndWeights[i].getWeight() >= filterWeight) {
         locations.add(hostAndWeights[i].getHost());
       } else {
+        // As hostAndWeights is in descending order,
+        // we could break the loop as long as we meet a weight which is less than filterWeight.
         break;
       }
     }
@@ -439,6 +472,12 @@ public class TableSnapshotInputFormatImpl {
     return locations;
   }
 
+  public static List<String> getBestLocations(Configuration conf,
+      HDFSBlocksDistribution blockDistribution) {
+    // 3 nodes will contain highly local blocks. So default to 3.
+    return getBestLocations(conf, blockDistribution, 3);
+  }
+
   private static String getSnapshotName(Configuration conf) {
     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
     if (snapshotName == null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e7d16a3/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
index be36b6a..8b4e918 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.mapred;
 
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
 import static org.mockito.Mockito.mock;
 
 import org.apache.hadoop.fs.Path;
@@ -138,7 +139,10 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
   @Test
   @Override
   public void testWithMockedMapReduceMultiRegion() throws Exception {
-    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10);
+    testWithMockedMapReduce(
+        UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
+        // It does not matter whether true or false is given to setLocalityEnabledTo,
+        // because it is not read in testWithMockedMapReduce().
   }
 
   @Test
@@ -165,7 +169,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
 
   @Override
   protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
-      int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
+      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
+      throws Exception {
     setupCluster();
     final TableName tableName = TableName.valueOf(name.getMethodName());
     try {
@@ -173,6 +178,9 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
         util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
 
       JobConf job = new JobConf(util.getConfiguration());
+      // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case
that
+      // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
+      // and the default value is taken.
       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
 
       if (numSplitsPerRegion > 1) {
@@ -206,10 +214,25 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
     HBaseTestingUtility.SeenRowTracker rowTracker =
       new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
 
+    // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
+    // so the default value is taken.
+    boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
+
     for (int i = 0; i < splits.length; i++) {
       // validate input split
       InputSplit split = splits[i];
       Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
+      if (localityEnabled) {
+        // When localityEnabled is true, meant to verify split.getLocations()
+        // by the following statement:
+        //   Assert.assertTrue(split.getLocations() != null && split.getLocations().length
!= 0);
+        // However, getLocations() of some splits could return an empty array (length is
0),
+        // so drop the verification on length.
+        // TODO: investigate how to verify split.getLocations() when localityEnabled is true
+        Assert.assertTrue(split.getLocations() != null);
+      } else {
+        Assert.assertTrue(split.getLocations() != null && split.getLocations().length
== 0);
+      }
 
       // validate record reader
       OutputCollector collector = mock(OutputCollector.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e7d16a3/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index 362dca1..4e11275 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -78,7 +78,8 @@ public abstract class TableSnapshotInputFormatTestBase {
   }
 
   protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
-    int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception;
+    int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
+    throws Exception;
 
   protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
     String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
@@ -90,12 +91,12 @@ public abstract class TableSnapshotInputFormatTestBase {
 
   @Test
   public void testWithMockedMapReduceSingleRegion() throws Exception {
-    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1);
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1, true);
   }
 
   @Test
   public void testWithMockedMapReduceMultiRegion() throws Exception {
-    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8);
+    testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8, false);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e7d16a3/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 890eb2f..2ed6081 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hbase.mapreduce;
 
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -98,7 +101,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
     Configuration conf = UTIL.getConfiguration();
 
     HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
-    Assert.assertEquals(Lists.newArrayList(),
+    Assert.assertEquals(null,
       TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
 
     blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
@@ -132,7 +135,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
     blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
     blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
 
-    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
+    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
       TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
   }
 
@@ -210,14 +213,17 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
 
   @Override
   public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
-      int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
+      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
+      throws Exception {
     setupCluster();
     final TableName tableName = TableName.valueOf(name.getMethodName());
     try {
       createTableAndSnapshot(
         util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
 
-      Job job = new Job(util.getConfiguration());
+      Configuration conf = util.getConfiguration();
+      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
+      Job job = new Job(conf);
       Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
       Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
 
@@ -304,10 +310,19 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
     HBaseTestingUtility.SeenRowTracker rowTracker =
         new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
 
+    boolean localityEnabled =
+        job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
+                                          SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
+
     for (int i = 0; i < splits.size(); i++) {
       // validate input split
       InputSplit split = splits.get(i);
       Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+      if (localityEnabled) {
+        Assert.assertTrue(split.getLocations() != null && split.getLocations().length
!= 0);
+      } else {
+        Assert.assertTrue(split.getLocations() != null && split.getLocations().length
== 0);
+      }
 
       // validate record reader
       TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);


Mime
View raw message