hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [52/60] [abbrv] hbase git commit: HBASE-16894 Create more than 1 split per region, generalize HBASE-12590
Date Fri, 06 Oct 2017 04:15:31 GMT
HBASE-16894 Create more than 1 split per region, generalize HBASE-12590

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16d483f9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16d483f9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16d483f9

Branch: refs/heads/HBASE-18467
Commit: 16d483f9003ddee71404f37ce7694003d1a18ac4
Parents: 56830c3
Author: Yi Liang <yliang@us.ibm.com>
Authored: Wed Sep 13 11:38:29 2017 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Oct 3 17:11:06 2017 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/TableInputFormatBase.java   | 473 ++++++++++---------
 .../mapreduce/TestTableInputFormatScan1.java    | 103 +---
 .../mapreduce/TestTableInputFormatScanBase.java |  83 +++-
 3 files changed, 324 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index b3be90b..e7a65e8 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -24,13 +24,12 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -93,20 +93,21 @@ import org.apache.hadoop.util.StringUtils;
  *     }
  *   }
  * </pre>
+ *
+ *
+ * The number of InputSplits(mappers) match the number of regions in a table by default.
+ * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region,
set
+ * this property will disable autobalance below.\
+ * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers
+ * based on average region size; For regions, whose size larger than average region size
may assigned
+ * more mappers, and for smaller one, they may group together to use one mapper. If actual
average
+ * region size is too big, like 50G, it is not good to only assign 1 mapper for those large
regions.
+ * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece",
+ * default mas average region size is 8G.
  */
 @InterfaceAudience.Public
 public abstract class TableInputFormatBase
-extends InputFormat<ImmutableBytesWritable, Result> {
-
-  /** Specify if we enable auto-balance for input in M/R jobs.*/
-  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
-  /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
-   * .input.autobalance property.*/
-  public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance"
+
-          ".maxskewratio";
-  /** Specify if the row key in table is text (ASCII between 32~126),
-   * default is true. False means the table is using binary row key*/
-  public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
+    extends InputFormat<ImmutableBytesWritable, Result> {
 
   private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
@@ -114,8 +115,17 @@ extends InputFormat<ImmutableBytesWritable, Result> {
       "initialized. Ensure you call initializeTable either in your constructor or initialize
" +
       "method";
   private static final String INITIALIZATION_ERROR = "Cannot create a record reader because
of a" +
-            " previous error. Please look at the previous logs lines from" +
-            " the task's full log for more details.";
+      " previous error. Please look at the previous logs lines from" +
+      " the task's full log for more details.";
+
+  /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
+  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance";
+  /** In auto-balance, we split input by ave region size, if calculated region size is too
big, we can set it. */
+  public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize";
+
+  /** Set the number of Mappers for each region, all regions have same number of Mappers
*/
+  public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region";
+
 
   /** Holds the details for the internal scanner.
    *
@@ -134,7 +144,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
 
 
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
-  private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
+  private HashMap<InetAddress, String> reverseDNSCacheMap =
+      new HashMap<>();
 
   /**
    * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
@@ -151,7 +162,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   @Override
   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
       InputSplit split, TaskAttemptContext context)
-  throws IOException {
+      throws IOException {
     // Just in case a subclass is relying on JobConfigurable magic.
     if (table == null) {
       initialize(context);
@@ -215,9 +226,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
-   * Calculates the splits that will serve as input for the map tasks. The
-   * number of splits matches the number of regions in a table.
-   *
+   * Calculates the splits that will serve as input for the map tasks.
    * @param context  The current job context.
    * @return The list of input splits.
    * @throws IOException When creating the list of splits fails.
@@ -245,269 +254,263 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
 
     try {
-      RegionSizeCalculator sizeCalculator =
-          new RegionSizeCalculator(getRegionLocator(), getAdmin());
-
-      TableName tableName = getTable().getName();
-
-      Pair<byte[][], byte[][]> keys = getStartEndKeys();
-      if (keys == null || keys.getFirst() == null ||
-          keys.getFirst().length == 0) {
-        HRegionLocation regLoc =
-            getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-        if (null == regLoc) {
-          throw new IOException("Expecting at least one region.");
+      List<InputSplit> splits = oneInputSplitPerRegion();
+
+      // set same number of mappers for each region
+      if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
+        int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION,
1);
+        List<InputSplit> res = new ArrayList<>();
+        for (int i = 0; i < splits.size(); i++) {
+          List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
+          res.addAll(tmp);
         }
-        List<InputSplit> splits = new ArrayList<>(1);
-        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-        TableSplit split = new TableSplit(tableName, scan,
-            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
-                .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
-        splits.add(split);
-        return splits;
+        return res;
+      }
+
+      //The default value of "hbase.mapreduce.input.autobalance" is false.
+      if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false)
{
+        long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE,
8*1073741824);
+        return calculateAutoBalancedSplits(splits, maxAveRegionSize);
       }
-      List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
-      for (int i = 0; i < keys.getFirst().length; i++) {
-        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-          continue;
-        }
 
-        byte[] startRow = scan.getStartRow();
-        byte[] stopRow = scan.getStopRow();
-        // determine if the given start an stop key fall into the region
-        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-            (stopRow.length == 0 ||
-             Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-          byte[] splitStart = startRow.length == 0 ||
+      // return one mapper per region
+      return splits;
+    } finally {
+      if (closeOnFinish) {
+        closeTable();
+      }
+    }
+  }
+
+  /**
+   * Create one InputSplit per region
+   *
+   * @return The list of InputSplit for all the regions
+   * @throws IOException
+   */
+  private List<InputSplit> oneInputSplitPerRegion() throws IOException {
+    RegionSizeCalculator sizeCalculator =
+        new RegionSizeCalculator(getRegionLocator(), getAdmin());
+
+    TableName tableName = getTable().getName();
+
+    Pair<byte[][], byte[][]> keys = getStartEndKeys();
+    if (keys == null || keys.getFirst() == null ||
+        keys.getFirst().length == 0) {
+      HRegionLocation regLoc =
+          getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+      if (null == regLoc) {
+        throw new IOException("Expecting at least one region.");
+      }
+      List<InputSplit> splits = new ArrayList<>(1);
+      long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+      TableSplit split = new TableSplit(tableName, scan,
+          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+          .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
+      splits.add(split);
+      return splits;
+    }
+    List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
+    for (int i = 0; i < keys.getFirst().length; i++) {
+      if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+        continue;
+      }
+
+      byte[] startRow = scan.getStartRow();
+      byte[] stopRow = scan.getStopRow();
+      // determine if the given start an stop key fall into the region
+      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+          (stopRow.length == 0 ||
+              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+        byte[] splitStart = startRow.length == 0 ||
             Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-              keys.getFirst()[i] : startRow;
-          byte[] splitStop = (stopRow.length == 0 ||
+            keys.getFirst()[i] : startRow;
+        byte[] splitStop = (stopRow.length == 0 ||
             Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
             keys.getSecond()[i].length > 0 ?
-              keys.getSecond()[i] : stopRow;
+            keys.getSecond()[i] : stopRow;
 
-          HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i],
false);
-          // The below InetSocketAddress creation does a name resolution.
-          InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
-          if (isa.isUnresolved()) {
-            LOG.warn("Failed resolve " + isa);
-          }
-          InetAddress regionAddress = isa.getAddress();
-          String regionLocation;
-          regionLocation = reverseDNS(regionAddress);
-
-          byte[] regionName = location.getRegionInfo().getRegionName();
-          String encodedRegionName = location.getRegionInfo().getEncodedName();
-          long regionSize = sizeCalculator.getRegionSize(regionName);
-          TableSplit split = new TableSplit(tableName, scan,
-            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
-          splits.add(split);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("getSplits: split -> " + i + " -> " + split);
-          }
-        }
-      }
-      //The default value of "hbase.mapreduce.input.autobalance" is false, which means not
enabled.
-      boolean enableAutoBalance = context.getConfiguration()
-        .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
-      if (enableAutoBalance) {
-        long totalRegionSize=0;
-        for (int i = 0; i < splits.size(); i++){
-          TableSplit ts = (TableSplit)splits.get(i);
-          totalRegionSize += ts.getLength();
+        HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i],
false);
+        // The below InetSocketAddress creation does a name resolution.
+        InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
+        if (isa.isUnresolved()) {
+          LOG.warn("Failed resolve " + isa);
         }
-        long averageRegionSize = totalRegionSize / splits.size();
-        // the averageRegionSize must be positive.
-        if (averageRegionSize <= 0) {
-            LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", "
+
-                    "set it to 1.");
-            averageRegionSize = 1;
+        InetAddress regionAddress = isa.getAddress();
+        String regionLocation;
+        regionLocation = reverseDNS(regionAddress);
+
+        byte[] regionName = location.getRegionInfo().getRegionName();
+        String encodedRegionName = location.getRegionInfo().getEncodedName();
+        long regionSize = sizeCalculator.getRegionSize(regionName);
+        TableSplit split = new TableSplit(tableName, scan,
+            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
+        splits.add(split);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("getSplits: split -> " + i + " -> " + split);
         }
-        return calculateRebalancedSplits(splits, context, averageRegionSize);
-      } else {
-        return splits;
-      }
-    } finally {
-      if (closeOnFinish) {
-        closeTable();
       }
     }
+    return splits;
   }
 
-  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
-    String hostName = this.reverseDNSCacheMap.get(ipAddress);
-    if (hostName == null) {
-      String ipAddressString = null;
-      try {
-        ipAddressString = DNS.reverseDns(ipAddress, null);
-      } catch (Exception e) {
-        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry
from the
-        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
-        // reverse DNS using jndi doesn't work well with ipv6 addresses.
-        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
+  /**
+   * Create n splits for one InputSplit, For now only support uniform distribution
+   * @param split A TableSplit corresponding to a range of rowkeys
+   * @param n     Number of ranges after splitting.  Pass 1 means no split for the range
+   *              Pass 2 if you want to split the range in two;
+   * @return A list of TableSplit, the size of the list is n
+   * @throws IllegalArgumentIOException
+   */
+  protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
+      throws IllegalArgumentIOException {
+    if (split == null || !(split instanceof TableSplit)) {
+      throw new IllegalArgumentIOException(
+          "InputSplit for CreateNSplitsPerRegion can not be null + "
+              + "and should be instance of TableSplit");
+    }
+    //if n < 1, then still continue using n = 1
+    n = n < 1 ? 1 : n;
+    List<InputSplit> res = new ArrayList<>(n);
+    if (n == 1) {
+      res.add(split);
+      return res;
+    }
+
+    // Collect Region related information
+    TableSplit ts = (TableSplit) split;
+    TableName tableName = ts.getTable();
+    String regionLocation = ts.getRegionLocation();
+    String encodedRegionName = ts.getEncodedRegionName();
+    long regionSize = ts.getLength();
+    byte[] startRow = ts.getStartRow();
+    byte[] endRow = ts.getEndRow();
+
+    // For special case: startRow or endRow is empty
+    if (startRow.length == 0 && endRow.length == 0){
+      startRow = new byte[1];
+      endRow = new byte[1];
+      startRow[0] = 0;
+      endRow[0] = -1;
+    }
+    if (startRow.length == 0 && endRow.length != 0){
+      startRow = new byte[1];
+      startRow[0] = 0;
+    }
+    if (startRow.length != 0 && endRow.length == 0){
+      endRow =new byte[startRow.length];
+      for (int k = 0; k < startRow.length; k++){
+        endRow[k] = -1;
       }
-      if (ipAddressString == null) throw new UnknownHostException("No host found for " +
ipAddress);
-      hostName = Strings.domainNamePointerToHostName(ipAddressString);
-      this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
-    return hostName;
-  }
 
+    // Split Region into n chunks evenly
+    byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      //notice that the regionSize parameter may be not very accurate
+      TableSplit tsplit =
+          new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
+              encodedRegionName, regionSize / n);
+      res.add(tsplit);
+    }
+    return res;
+  }
   /**
    * Calculates the number of MapReduce input splits for the map tasks. The number of
-   * MapReduce input splits depends on the average region size and the "data skew ratio"
user set in
-   * configuration.
+   * MapReduce input splits depends on the average region size.
+   * Make it 'public' for testing
    *
-   * @param list  The list of input splits before balance.
-   * @param context  The current job context.
-   * @param average  The average size of all regions .
+   * @param splits The list of input splits before balance.
+   * @param maxAverageRegionSize max Average region size for one mapper
    * @return The list of input splits.
    * @throws IOException When creating the list of splits fails.
    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
-   *   org.apache.hadoop.mapreduce.JobContext)
+   *org.apache.hadoop.mapreduce.JobContext)
    */
-  private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext
context,
-                                               long average) throws IOException {
+  public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits,
long maxAverageRegionSize)
+      throws IOException {
+    if (splits.size() == 0) {
+      return splits;
+    }
     List<InputSplit> resultList = new ArrayList<>();
-    Configuration conf = context.getConfiguration();
-    //The default data skew ratio is 3
-    long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
-    //It determines which mode to use: text key mode or binary key mode. The default is text
mode.
-    boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
-    long dataSkewThreshold = dataSkewRatio * average;
-    int count = 0;
-    while (count < list.size()) {
-      TableSplit ts = (TableSplit)list.get(count);
+    long totalRegionSize = 0;
+    for (int i = 0; i < splits.size(); i++) {
+      TableSplit ts = (TableSplit) splits.get(i);
+      totalRegionSize += ts.getLength();
+    }
+    long averageRegionSize = totalRegionSize / splits.size();
+    // totalRegionSize might be overflow, and the averageRegionSize must be positive.
+    if (averageRegionSize <= 0) {
+      LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
+          "set it to Long.MAX_VALUE " + splits.size());
+      averageRegionSize = Long.MAX_VALUE / splits.size();
+    }
+    //if averageRegionSize is too big, change it to default as 1 GB,
+    if (averageRegionSize > maxAverageRegionSize) {
+      averageRegionSize = maxAverageRegionSize;
+    }
+    // if averageRegionSize is too small, we do not need to allocate more mappers for those
'large' region
+    // set default as 16M = (default hdfs block size) / 4;
+    if (averageRegionSize < 16 * 1048576) {
+      return splits;
+    }
+    for (int i = 0; i < splits.size(); i++) {
+      TableSplit ts = (TableSplit) splits.get(i);
       TableName tableName = ts.getTable();
       String regionLocation = ts.getRegionLocation();
       String encodedRegionName = ts.getEncodedRegionName();
       long regionSize = ts.getLength();
-      if (regionSize >= dataSkewThreshold) {
-        // if the current region size is large than the data skew threshold,
-        // split the region into two MapReduce input splits.
-        byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
-        if (Arrays.equals(ts.getEndRow(), splitKey)) {
-          // Not splitting since the end key is the same as the split key
-          resultList.add(ts);
-        } else {
-          //Set the size of child TableSplit as 1/2 of the region size. The exact size of
the
-          // MapReduce input splits is not far off.
-          TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
-              regionLocation, regionSize / 2);
-          TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
-              regionSize - regionSize / 2);
-          resultList.add(t1);
-          resultList.add(t2);
-        }
-        count++;
-      } else if (regionSize >= average) {
-        // if the region size between average size and data skew threshold size,
-        // make this region as one MapReduce input split.
-        resultList.add(ts);
-        count++;
+
+      if (regionSize >= averageRegionSize) {
+        // make this region as multiple MapReduce input split.
+        int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize))
+ 1.0);
+        List<InputSplit> temp = createNInputSplitsUniform(ts, n);
+        resultList.addAll(temp);
       } else {
         // if the total size of several small continuous regions less than the average region
size,
         // combine them into one MapReduce input split.
         long totalSize = regionSize;
         byte[] splitStartKey = ts.getStartRow();
         byte[] splitEndKey = ts.getEndRow();
-        count++;
-        for (; count < list.size(); count++) {
-          TableSplit nextRegion = (TableSplit)list.get(count);
+        int j = i + 1;
+        while (j < splits.size()) {
+          TableSplit nextRegion = (TableSplit) splits.get(j);
           long nextRegionSize = nextRegion.getLength();
-          if (totalSize + nextRegionSize <= dataSkewThreshold) {
+          if (totalSize + nextRegionSize <= averageRegionSize) {
             totalSize = totalSize + nextRegionSize;
             splitEndKey = nextRegion.getEndRow();
+            j++;
           } else {
             break;
           }
         }
-        TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
-                regionLocation, encodedRegionName, totalSize);
+        i = j - 1;
+        TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
+            encodedRegionName, totalSize);
         resultList.add(t);
       }
     }
     return resultList;
   }
 
-  /**
-   * select a split point in the region. The selection of the split point is based on an
uniform
-   * distribution assumption for the keys in a region.
-   * Here are some examples:
-   *
-   * <table>
-   *   <tr>
-   *     <th>start key</th>
-   *     <th>end key</th>
-   *     <th>is text</th>
-   *     <th>split point</th>
-   *   </tr>
-   *   <tr>
-   *     <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
-   *     <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
-   *     <td>true</td>
-   *     <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0', '0', '0'</td>
-   *     <td>'1', '1', '2', '5', '7', '9', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -78, -77, -76, -104</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0'</td>
-   *     <td>'1', '1', '2', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -80</td>
-   *   </tr>
-   *   <tr>
-   *     <td>13, -19, 126, 127</td>
-   *     <td>13, -19, 127, 0</td>
-   *     <td>false</td>
-   *     <td>13, -19, 126, -65</td>
-   *   </tr>
-   * </table>
-   *
-   * Set this function as "public static", make it easier for test.
-   *
-   * @param start Start key of the region
-   * @param end End key of the region
-   * @param isText It determines to use text key mode or binary key mode
-   * @return The split point in the region.
-   */
-  @InterfaceAudience.Private
-  public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
-    byte upperLimitByte;
-    byte lowerLimitByte;
-    //Use text mode or binary mode.
-    if (isText) {
-      //The range of text char set in ASCII is [32,126], the lower limit is space and the
upper
-      // limit is '~'.
-      upperLimitByte = '~';
-      lowerLimitByte = ' ';
-    } else {
-      upperLimitByte = -1;
-      lowerLimitByte = 0;
-    }
-    // For special case
-    // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
-    // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
-    if (start.length == 0 && end.length == 0){
-      return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
-    }
-    if (start.length == 0 && end.length != 0){
-      return new byte[]{ end[0] };
-    }
-    if (start.length != 0 && end.length == 0){
-      byte[] result =new byte[start.length];
-      result[0]=start[0];
-      for (int k = 1; k < start.length; k++){
-          result[k] = upperLimitByte;
+  String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
+    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    if (hostName == null) {
+      String ipAddressString = null;
+      try {
+        ipAddressString = DNS.reverseDns(ipAddress, null);
+      } catch (Exception e) {
+        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry
from the
+        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
+        // reverse DNS using jndi doesn't work well with ipv6 addresses.
+        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
       }
-      return result;
+      if (ipAddressString == null) throw new UnknownHostException("No host found for " +
ipAddress);
+      hostName = Strings.domainNamePointerToHostName(ipAddressString);
+      this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
-    return Bytes.split(start, end, false, 1)[1];
+    return hostName;
   }
 
   /**
@@ -649,4 +652,4 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
index 99b40b9..553869e 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
@@ -98,103 +98,38 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase
{
   }
 
   /**
-   * Tests a MR scan using specific number of mappers. The test table has 25 regions,
-   * and all region sizes are set as 0 as default. The average region size is 1 (the smallest
-   * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into
two
-   * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
-   * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average
region
-   * size, all regions will be combined into 1 MapRedcue input split.
+   * Tests a MR scan using specific number of mappers. The test table has 26 regions,
    *
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
    */
   @Test
-  public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException
{
-    testNumOfSplits("-1", 52);
-    testNumOfSplits("100", 1);
-  }
+    public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException
{
+      testNumOfSplits(1, 26);
+      testNumOfSplits(3, 78);
+    }
 
   /**
-   * Tests the getSplitKey() method in TableInputFormatBase.java
-   *
+   * Runs a MR to test TIF using specific number of mappers. The test table has 26 regions,
+   * @throws InterruptedException
    * @throws IOException
    * @throws ClassNotFoundException
-   * @throws InterruptedException
    */
   @Test
-  public void testGetSplitsPoint() throws IOException, InterruptedException,
-          ClassNotFoundException {
-    byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
-    byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
-    byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77  };
-    testGetSplitKey(start1, end1, splitPoint1, true);
-
-    byte[] start2 = { '1', '1', '1', '0', '0', '0' };
-    byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
-    byte[] splitPoint2 = { '1', '1', '1',  -78, -77, -76, -104 };
-    testGetSplitKey(start2, end2, splitPoint2, true);
-
-    byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
-    byte[] end3 = { 'a', 'a', 'b' };
-    byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 };
-    testGetSplitKey(start3, end3, splitPoint3, true);
-
-    byte[] start4 = { 'a', 'a', 'a' };
-    byte[] end4 = { 'a', 'a', 'a', 'z' };
-    byte[] splitPoint4 = { 'a', 'a', 'a', '=' };
-    testGetSplitKey(start4, end4, splitPoint4, true);
-
-    byte[] start5 = { 'a', 'a', 'a' };
-    byte[] end5 = { 'a', 'a', 'b', 'a' };
-    byte[] splitPoint5 = { 'a', 'a', 'a', -80 };
-    testGetSplitKey(start5, end5, splitPoint5, true);
-
-    // Test Case 6: empty key and "hhhqqqwww", split point is "h"
-    byte[] start6 = {};
-    byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
-    byte[] splitPointText6 = { 'h' };
-    byte[] splitPointBinary6 = { 104 };
-    testGetSplitKey(start6, end6, splitPointText6, true);
-    testGetSplitKey(start6, end6, splitPointBinary6, false);
-
-    // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text
key or
-    // binary key).
-    byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
-    byte[] end7 = {};
-    byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~'  };
-    byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1  };
-    testGetSplitKey(start7, end7, splitPointText7, true);
-    testGetSplitKey(start7, end7, splitPointBinary7, false);
-
-    // Test Case 8: both start key and end key are empty. Split point depends on the mode
we
-    // choose (text key or binary key).
-    byte[] start8 = {};
-    byte[] end8 = {};
-    byte[] splitPointText8 = { 'O' };
-    byte[] splitPointBinary8 = { 0 };
-    testGetSplitKey(start8, end8, splitPointText8, true);
-    testGetSplitKey(start8, end8, splitPointBinary8, false);
-
-    // Test Case 9: Binary Key example
-    byte[] start9 = { 13, -19, 126, 127 };
-    byte[] end9 = { 13, -19, 127, 0 };
-    byte[] splitPoint9 = { 13, -19, 126, -65 };
-    testGetSplitKey(start9, end9, splitPoint9, false);
-
-    // Test Case 10: Binary key split when the start key is an unsigned byte and the end
byte is a
-    // signed byte
-    byte[] start10 = { 'x' };
-    byte[] end10 = { -128 };
-    byte[] splitPoint10 = { '|' };
-    testGetSplitKey(start10, end10, splitPoint10, false);
+  public void testSpecifiedNumOfMappersMR()
+      throws InterruptedException, IOException, ClassNotFoundException {
+    testNumOfSplitsMR(2, 52);
+    testNumOfSplitsMR(4, 104);
+  }
 
-    // Test Case 11: Binary key split when the start key is an signed byte and the end byte
is a
-    // signed byte
-    byte[] start11 = { -100 };
-    byte[] end11 = { -90 };
-    byte[] splitPoint11 = { -95 };
-    testGetSplitKey(start11, end11, splitPoint11, false);
+  /**
+   * Test if autoBalance create correct splits
+   * @throws IOException
+   */
+  @Test
+  public void testAutoBalanceSplits() throws IOException {
+    testAutobalanceNumOfSplit();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
index 13b6a96..d127adb 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -19,13 +19,11 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.NavigableMap;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -134,7 +135,7 @@ public abstract class TestTableInputFormatScanBase {
    */
   public static class ScanReducer
   extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
-                  NullWritable, NullWritable> {
+                    NullWritable, NullWritable> {
 
     private String first = null;
     private String last = null;
@@ -247,28 +248,28 @@ public abstract class TestTableInputFormatScanBase {
 
 
   /**
-   * Tests a MR scan using data skew auto-balance
+   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
+   * This test does not run MR job
    *
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
    */
-  public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
-          InterruptedException,
-          ClassNotFoundException {
+  public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
+      InterruptedException,
+      ClassNotFoundException {
     String jobName = "TestJobForNumOfSplits";
     LOG.info("Before map/reduce startup - job " + jobName);
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     Scan scan = new Scan();
     scan.addFamily(INPUT_FAMILYS[0]);
     scan.addFamily(INPUT_FAMILYS[1]);
-    c.set("hbase.mapreduce.input.autobalance", "true");
-    c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
+    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
     c.set(KEY_STARTROW, "");
     c.set(KEY_LASTROW, "");
     Job job = new Job(c, jobName);
     TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
-            ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
     TableInputFormat tif = new TableInputFormat();
     tif.setConf(job.getConfiguration());
     Assert.assertEquals(TABLE_NAME, table.getName());
@@ -277,11 +278,61 @@ public abstract class TestTableInputFormatScanBase {
   }
 
   /**
-   * Tests for the getSplitKey() method in TableInputFormatBase.java
+   * Run MR job to check the number of mapper = expectedNumOfSplits
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
    */
-  public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText)
{
-    byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
-      Assert.assertArrayEquals(splitKey, result);
+  public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
+      InterruptedException,
+      ClassNotFoundException {
+    String jobName = "TestJobForNumOfSplits-MR";
+    LOG.info("Before map/reduce startup - job " + jobName);
+    JobConf c = new JobConf(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILYS[0]);
+    scan.addFamily(INPUT_FAMILYS[1]);
+    c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
+    c.set(KEY_STARTROW, "");
+    c.set(KEY_LASTROW, "");
+    Job job = Job.getInstance(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    assertTrue("job failed!", job.waitForCompletion(true));
+    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
+    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
+    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
+        job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
+  }
+
+  /**
+   * Run MR job to test autobalance for setting number of mappers for TIF
+   * This does not run real MR job
+   */
+  public void testAutobalanceNumOfSplit() throws IOException {
+    // set up splits for testing
+    List<InputSplit> splits = new ArrayList<>(5);
+    int[] regionLen = {10, 20, 20, 40, 60};
+    for (int i = 0; i < 5; i++) {
+      InputSplit split = new TableSplit(TABLE_NAME, new Scan(),
+          Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
+      splits.add(split);
+    }
+    TableInputFormat tif = new TableInputFormat();
+    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
+
+    assertEquals("Saw the wrong number of splits", 5, res.size());
+    TableSplit ts1 = (TableSplit) res.get(0);
+    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
+    TableSplit ts2 = (TableSplit) res.get(1);
+    assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
+    TableSplit ts3 = (TableSplit) res.get(2);
+    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
+    TableSplit ts4 = (TableSplit) res.get(4);
+    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
   }
 }
 


Mime
View raw message