hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-13168 Backport HBASE-12590 "A solution for data skew in HBase-Mapreduce Job"
Date Wed, 11 Mar 2015 01:42:59 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 76ee124c6 -> 1b4f8afae


HBASE-13168 Backport HBASE-12590 "A solution for data skew in HBase-Mapreduce Job"


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1b4f8afa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1b4f8afa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1b4f8afa

Branch: refs/heads/0.98
Commit: 1b4f8afaec8cd4dfef46154bdceb31ce7ddf5982
Parents: 76ee124
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Mar 10 18:42:46 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Mar 10 18:42:46 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/TableInputFormatBase.java   | 202 ++++++++++++++++++-
 .../mapreduce/TestTableInputFormatScan1.java    |  91 +++++++++
 .../mapreduce/TestTableInputFormatScanBase.java |  43 ++++
 3 files changed, 333 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1b4f8afa/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index acd28b6..5b27383 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -30,6 +30,7 @@ import javax.naming.NamingException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
@@ -89,6 +90,16 @@ import org.apache.hadoop.util.StringUtils;
 public abstract class TableInputFormatBase
 extends InputFormat<ImmutableBytesWritable, Result> {
 
+  /** Specify if we enable auto-balance for input in M/R jobs.*/
+  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
+  /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
+   * .input.autobalance property.*/
+  public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance"
+
+          ".maxskewratio";
+  /** Specify if the row key in table is text (ASCII between 32~126),
+   * default is true. False means the table is using binary row key*/
+  public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
+
   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
   /** Holds the details for the internal scanner. */
@@ -227,7 +238,26 @@ extends InputFormat<ImmutableBytesWritable, Result> {
         }
       }
     }
-    return splits;
+    //The default value of "hbase.mapreduce.input.autobalance" is false, which means not
enabled.
+    boolean enableAutoBalance = context.getConfiguration().getBoolean(
+      MAPREDUCE_INPUT_AUTOBALANCE, false);
+    if (enableAutoBalance) {
+      long totalRegionSize=0;
+      for (int i = 0; i < splits.size(); i++){
+        TableSplit ts = (TableSplit)splits.get(i);
+        totalRegionSize += ts.getLength();
+      }
+      long averageRegionSize = totalRegionSize / splits.size();
+      // the averageRegionSize must be positive.
+      if (averageRegionSize <= 0) {
+        LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
+            "set it to 1.");
+        averageRegionSize = 1;
+      }
+      return calculateRebalancedSplits(splits, context, averageRegionSize);
+    } else {
+      return splits;
+    }
   }
   
   public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException
{
@@ -250,6 +280,170 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
+   * Calculates the number of MapReduce input splits for the map tasks. The number of
+   * MapReduce input splits depends on the average region size and the "data skew ratio"
user set in
+   * configuration.
+   *
+   * @param list  The list of input splits before balance.
+   * @param context  The current job context.
+   * @param average  The average size of all regions .
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
+   *   org.apache.hadoop.mapreduce.JobContext)
+   */
+  public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext
context,
+                                               long average) throws IOException {
+    List<InputSplit> resultList = new ArrayList<InputSplit>();
+    Configuration conf = context.getConfiguration();
+    //The default data skew ratio is 3
+    long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
+    //It determines which mode to use: text key mode or binary key mode. The default is text
mode.
+    boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
+    long dataSkewThreshold = dataSkewRatio * average;
+    int count = 0;
+    while (count < list.size()) {
+      TableSplit ts = (TableSplit)list.get(count);
+      String regionLocation = ts.getRegionLocation();
+      long regionSize = ts.getLength();
+      if (regionSize >= dataSkewThreshold) {
+        // if the current region size is large than the data skew threshold,
+        // split the region into two MapReduce input splits.
+        byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
+         //Set the size of child TableSplit as 1/2 of the region size. The exact size of
the
+         // MapReduce input splits is not far off.
+        TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
+                regionSize / 2);
+        TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
+                regionSize - regionSize / 2);
+        resultList.add(t1);
+        resultList.add(t2);
+        count++;
+      } else if (regionSize >= average) {
+        // if the region size between average size and data skew threshold size,
+        // make this region as one MapReduce input split.
+        resultList.add(ts);
+        count++;
+      } else {
+        // if the total size of several small continuous regions less than the average region
size,
+        // combine them into one MapReduce input split.
+        long totalSize = regionSize;
+        byte[] splitStartKey = ts.getStartRow();
+        byte[] splitEndKey = ts.getEndRow();
+        count++;
+        for (; count < list.size(); count++) {
+          TableSplit nextRegion = (TableSplit)list.get(count);
+          long nextRegionSize = nextRegion.getLength();
+          if (totalSize + nextRegionSize <= dataSkewThreshold) {
+            totalSize = totalSize + nextRegionSize;
+            splitEndKey = nextRegion.getEndRow();
+          } else {
+            break;
+          }
+        }
+        TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
+                regionLocation, totalSize);
+        resultList.add(t);
+      }
+    }
+    return resultList;
+  }
+
+  /**
+   * select a split point in the region. The selection of the split point is based on an
uniform
+   * distribution assumption for the keys in a region.
+   * Here are some examples:
+   * startKey: aaabcdefg  endKey: aaafff    split point: aaad
+   * startKey: 111000  endKey: 1125790    split point: 111b
+   * startKey: 1110  endKey: 1120    split point: 111_
+   * startKey: binary key { 13, -19, 126, 127 }, endKey: binary key { 13, -19, 127, 0 },
+   * split point: binary key { 13, -19, 127, -64 }
+   * Set this function as "public static", make it easier for test.
+   *
+   * @param start Start key of the region
+   * @param end End key of the region
+   * @param isText It determines to use text key mode or binary key mode
+   * @return The split point in the region.
+   */
+  public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
+    byte upperLimitByte;
+    byte lowerLimitByte;
+    //Use text mode or binary mode.
+    if (isText) {
+      //The range of text char set in ASCII is [32,126], the lower limit is space and the
upper
+      // limit is '~'.
+      upperLimitByte = '~';
+      lowerLimitByte = ' ';
+    } else {
+      upperLimitByte = Byte.MAX_VALUE;
+      lowerLimitByte = Byte.MIN_VALUE;
+    }
+    // For special case
+    // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
+    // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
+    if (start.length == 0 && end.length == 0){
+      return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
+    }
+    if (start.length == 0 && end.length != 0){
+      return new byte[]{ end[0] };
+    }
+    if (start.length != 0 && end.length == 0){
+      byte[] result =new byte[start.length];
+      result[0]=start[0];
+      for (int k = 1; k < start.length; k++){
+          result[k] = upperLimitByte;
+      }
+      return result;
+    }
+    // A list to store bytes in split key
+    List<Byte> resultBytesList = new ArrayList<Byte>();
+    int maxLength = start.length > end.length ? start.length : end.length;
+    for (int i = 0; i < maxLength; i++) {
+      //calculate the midpoint byte between the first difference
+      //for example: "11ae" and "11chw", the midpoint is "11b"
+      //another example: "11ae" and "11bhw", the first different byte is 'a' and 'b',
+      // there is no midpoint between 'a' and 'b', so we need to check the next byte.
+      if (start[i] == end[i]) {
+        resultBytesList.add(start[i]);
+        //For special case like: startKey="aaa", endKey="aaaz", splitKey="aaaM"
+        if (i + 1 == start.length) {
+          resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2));
+          break;
+        }
+      } else {
+        //if the two bytes differ by 1, like ['a','b'], We need to check the next byte to
find
+        // the midpoint.
+        if ((int)end[i] - (int)start[i] == 1) {
+          //get next byte after the first difference
+          byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte;
+          byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte;
+          int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte)
+ 1;
+          int halfRange = byteRange / 2;
+          if ((int)startNextByte + halfRange > (int)upperLimitByte) {
+            resultBytesList.add(end[i]);
+            resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte +
+                    lowerLimitByte));
+          } else {
+            resultBytesList.add(start[i]);
+            resultBytesList.add((byte) (startNextByte + halfRange));
+          }
+        } else {
+          //calculate the midpoint key by the fist different byte (normal case),
+          // like "11ae" and "11chw", the midpoint is "11b"
+          resultBytesList.add((byte) ((start[i] + end[i]) / 2));
+        }
+        break;
+      }
+    }
+    //transform the List of bytes to byte[]
+    byte result[] = new byte[resultBytesList.size()];
+    for (int k = 0; k < resultBytesList.size(); k++) {
+      result[k] = (byte) resultBytesList.get(k);
+    }
+    return result;
+  }
+
+  /**
    *
    *
    * Test if the given region is to be included in the InputSplit while splitting
@@ -258,12 +452,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * This optimization is effective when there is a specific reasoning to exclude an entire
region from the M-R job,
    * (and hence, not contributing to the InputSplit), given the start and end keys of the
same. <br>
    * Useful when we need to remember the last-processed top record and revisit the [last,
current) interval for M-R processing,
-   * continuously. In addition to reducing InputSplits, reduces the load on the region server
as well, due to the ordering of the keys.
+   * continuously. In addition to reducing InputSplits, reduces the load on the region server
as 
+   * well, due to the ordering of the keys.
    * <br>
    * <br>
    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
(recent) region.
    * <br>
-   * Override this method, if you want to bulk exclude regions altogether from M-R. By default,
no region is excluded( i.e. all regions are included).
+   * Override this method, if you want to bulk exclude regions altogether from M-R.
+   * By default, no region is excluded( i.e. all regions are included).
    *
    *
    * @param startKey Start key of the region

http://git-wip-us.apache.org/repos/asf/hbase/blob/1b4f8afa/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
index 490e89a..143b70c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
@@ -19,7 +19,10 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -96,4 +99,92 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase
{
     testScan(null, "opp", "opo");
   }
 
+  /**
+   * Tests a MR scan using specific number of mappers. The test table has 25 regions,
+   * and all region sizes are set as 0 as default. The average region size is 1 (the smallest
+   * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into
two
+   * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
+   * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average
region
+   * size, all regions will be combined into 1 MapRedcue input split.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException
{
+    testNumOfSplits("-1", 50);
+    testNumOfSplits("100", 1);
+  }
+
+  /**
+   * Tests the getSplitKey() method in TableInputFormatBase.java
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testGetSplitsPoint() throws IOException, InterruptedException,
+  ClassNotFoundException {
+    // Test Case 1: "aaabcdef" and "aaaff", split point is "aaad".
+    byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
+    byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
+    byte[] splitPoint1 = { 'a', 'a', 'a', 'd' };
+    testGetSplitKey(start1, end1, splitPoint1, true);
+
+    // Test Case 2: "111000" and "1125790", split point is "111b".
+    byte[] start2 = { '1', '1', '1', '0', '0', '0' };
+    byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
+    byte[] splitPoint2 = { '1', '1', '1', 'b' };
+    testGetSplitKey(start2, end2, splitPoint2, true);
+
+    // Test Case 3: "aaaaaa" and "aab", split point is "aaap".
+    byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
+    byte[] end3 = { 'a', 'a', 'b' };
+    byte[] splitPoint3 = { 'a', 'a', 'a', 'p' };
+    testGetSplitKey(start3, end3, splitPoint3, true);
+
+    // Test Case 4: "aaa" and "aaaz", split point is "aaaM".
+    byte[] start4 = { 'a', 'a', 'a' };
+    byte[] end4 = { 'a', 'a', 'a', 'z' };
+    byte[] splitPoint4 = { 'a', 'a', 'a', 'M' };
+    testGetSplitKey(start4, end4, splitPoint4, true);
+
+    // Test Case 5: "aaa" and "aaba", split point is "aaap".
+    byte[] start5 = { 'a', 'a', 'a' };
+    byte[] end5 = { 'a', 'a', 'b', 'a' };
+    byte[] splitPoint5 = { 'a', 'a', 'a', 'p' };
+    testGetSplitKey(start5, end5, splitPoint5, true);
+
+    // Test Case 6: empty key and "hhhqqqwww", split point is "h"
+    byte[] start6 = {};
+    byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
+    byte[] splitPoint6 = { 'h' };
+    testGetSplitKey(start6, end6, splitPoint6, true);
+
+    // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text
key or
+    // binary key).
+    byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
+    byte[] end7 = {};
+    byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~'  };
+    byte[] splitPointBinary7 = { 'f', 127, 127, 127, 127, 127, 127  };
+    testGetSplitKey(start7, end7, splitPointText7, true);
+    testGetSplitKey(start7, end7, splitPointBinary7, false);
+
+    // Test Case 8: both start key and end key are empty. Split point depends on the mode
we
+    // choose (text key or binary key).
+    byte[] start8 = {};
+    byte[] end8 = {};
+    byte[] splitPointText8 = { 'O' };
+    byte[] splitPointBinary8 = { 0 };
+    testGetSplitKey(start8, end8, splitPointText8, true);
+    testGetSplitKey(start8, end8, splitPointBinary8, false);
+
+    // Test Case 9: Binary Key example
+    byte[] start9 = { 13, -19, 126, 127 };
+    byte[] end9 = { 13, -19, 127, 0 };
+    byte[] splitPoint9 = { 13, -19, 127, -64 };
+    testGetSplitKey(start9, end9, splitPoint9, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1b4f8afa/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
index 697289e..a29b53c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 
@@ -30,18 +32,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 
+
 /**
  * <p>
  * Tests various scan start and stop row scenarios. This is set in a scan and
@@ -239,5 +245,42 @@ public abstract class TestTableInputFormatScanBase {
     LOG.info("After map/reduce completion - job " + jobName);
   }
 
+
+  /**
+   * Tests a MR scan using data skew auto-balance
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
+          InterruptedException,
+          ClassNotFoundException {
+    String jobName = "TestJobForNumOfSplits";
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    c.set("hbase.mapreduce.input.autobalance", "true");
+    c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
+    c.set(KEY_STARTROW, "");
+    c.set(KEY_LASTROW, "");
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
+            ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    TableInputFormat tif = new TableInputFormat();
+    tif.setConf(job.getConfiguration());
+    Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
+    List<InputSplit> splits = tif.getSplits(job);
+    Assert.assertEquals(expectedNumOfSplits, splits.size());
+  }
+
+  /**
+   * Tests for the getSplitKey() method in TableInputFormatBase.java
+   */
+  public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText)
{
+    byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
+      Assert.assertArrayEquals(splitKey, result);
+  }
 }
 


Mime
View raw message