hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [4/4] hbase git commit: HBASE-16894 Create more than 1 split per region, generalize HBASE-12590
Date Wed, 04 Oct 2017 01:15:57 GMT
HBASE-16894 Create more than 1 split per region, generalize HBASE-12590

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cbbcb2db
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cbbcb2db
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cbbcb2db

Branch: refs/heads/branch-1.4
Commit: cbbcb2db2f0a94382cb33fef826cbf1a00b5de6e
Parents: 5c1f2d6
Author: Yi Liang <yliang@us.ibm.com>
Authored: Fri Sep 29 15:12:03 2017 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Tue Oct 3 17:29:47 2017 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/TableInputFormatBase.java   | 406 ++++++++++---------
 .../mapreduce/TestTableInputFormatScan1.java    | 103 +----
 .../mapreduce/TestTableInputFormatScanBase.java |  82 +++-
 .../hbase/namespace/TestNamespaceAuditor.java   |   4 +-
 4 files changed, 289 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cbbcb2db/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index 148f367..661b981 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -99,23 +100,25 @@ import org.apache.hadoop.util.StringUtils;
  *       setScan(scan);
  *     }
  *   }
+ *
+ * The number of InputSplits(mappers) match the number of regions in a table by default.
+ * Set "hbase.mapreduce.input.mappers.per.region" to specify how many mappers per region,
set
+ * this property will disable autobalance below.
+ *
+ * Set "hbase.mapreduce.input.autobalance" to enable autobalance, hbase will assign mappers
based on
+ * average region size; For regions, whose size larger than average region size may assigned
more mappers,
+ * and for continuous small one, they may group together to use one mapper. If actual calculated
average
+ * region size is too big, it is not good to only assign 1 mapper for those large regions.
Then use
+ * "hbase.mapreduce.input.average.regionsize" to set max average region size when enable
"autobalanece",
+ * default was average region size is 8G.
  * </pre>
+ *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class TableInputFormatBase
 extends InputFormat<ImmutableBytesWritable, Result> {
 
-  /** Specify if we enable auto-balance for input in M/R jobs.*/
-  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
-  /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
-   * .input.autobalance property.*/
-  public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance"
+
-          ".maxskewratio";
-  /** Specify if the row key in table is text (ASCII between 32~126),
-   * default is true. False means the table is using binary row key*/
-  public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
-
   private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
   private static final String NOT_INITIALIZED = "The input format instance has not been properly
" +
@@ -125,6 +128,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
             " previous error. Please look at the previous logs lines from" +
             " the task's full log for more details.";
 
+  /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
+  public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
+  /** In auto-balance, we split input by ave region size, if calculated region size is too
big, we can set it. */
+  public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.input.average.regionsize";
+
+  /** Set the number of Mappers for each region, all regions have same number of Mappers
*/
+  public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.input.mappers.per.region";
+
   /** Holds the details for the internal scanner.
    *
    * @see Scan */
@@ -140,7 +151,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   /** The underlying {@link Connection} of the table. */
   private Connection connection;
 
-  
+
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
   private HashMap<InetAddress, String> reverseDNSCacheMap =
     new HashMap<InetAddress, String>();
@@ -252,28 +263,68 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     } catch (IllegalStateException exception) {
       throw new IOException(INITIALIZATION_ERROR, exception);
     }
-
     try {
-    RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin);
+      List<InputSplit> splits = oneInputSplitPerRegion();
+
+      // set same number of mappers for each region
+      if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
+        int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION,
1);
+        List<InputSplit> res = new ArrayList<>();
+        for (int i = 0; i < splits.size(); i++) {
+          List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
+          res.addAll(tmp);
+        }
+        return res;
+      }
+
+      //The default value of "hbase.mapreduce.input.autobalance" is false.
+      if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false)
{
+        long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE,
8*1073741824);
+        return calculateAutoBalancedSplits(splits, maxAveRegionSize);
+      }
+
+      // return one mapper per region
+      return splits;
+    } catch (NamingException e) {
+      throw new IOException(e);
+    } finally {
+      if (closeOnFinish) {
+        closeTable();
+      }
+    }
+  }
+
+  /**
+   * Create one InputSplit per region
+   *
+   * @return The list of InputSplit for all the regions
+   * @throws IOException
+   */
+  private List<InputSplit> oneInputSplitPerRegion() throws IOException, NamingException
{
+    RegionSizeCalculator sizeCalculator =
+        new RegionSizeCalculator(getRegionLocator(), getAdmin());
+
+    TableName tableName = getTable().getName();
 
     Pair<byte[][], byte[][]> keys = getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
-      HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY,
false);
+      HRegionLocation regLoc =
+          getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
       if (null == regLoc) {
         throw new IOException("Expecting at least one region.");
       }
-      List<InputSplit> splits = new ArrayList<InputSplit>(1);
+      List<InputSplit> splits = new ArrayList<>(1);
       long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-      TableSplit split = new TableSplit(table.getName(), scan,
+      TableSplit split = new TableSplit(tableName, scan,
           HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
-              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
+          .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
       splits.add(split);
       return splits;
     }
-    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
+    List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
     for (int i = 0; i < keys.getFirst().length; i++) {
-      if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+      if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
         continue;
       }
 
@@ -283,16 +334,16 @@ extends InputFormat<ImmutableBytesWritable, Result> {
       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
           (stopRow.length == 0 ||
-           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
         byte[] splitStart = startRow.length == 0 ||
-          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
             keys.getFirst()[i] : startRow;
         byte[] splitStop = (stopRow.length == 0 ||
-          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-          keys.getSecond()[i].length > 0 ?
+            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+            keys.getSecond()[i].length > 0 ?
             keys.getSecond()[i] : stopRow;
 
-        HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false);
+        HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i],
false);
         // The below InetSocketAddress creation does a name resolution.
         InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
         if (isa.isUnresolved()) {
@@ -300,145 +351,155 @@ extends InputFormat<ImmutableBytesWritable, Result> {
         }
         InetAddress regionAddress = isa.getAddress();
         String regionLocation;
-        try {
-          regionLocation = reverseDNS(regionAddress);
-        } catch (NamingException e) {
-          LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " +
e);
-          regionLocation = location.getHostname();
-        }
+        regionLocation = reverseDNS(regionAddress);
 
         byte[] regionName = location.getRegionInfo().getRegionName();
         String encodedRegionName = location.getRegionInfo().getEncodedName();
         long regionSize = sizeCalculator.getRegionSize(regionName);
-        TableSplit split = new TableSplit(table.getName(), scan,
-          splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
+        TableSplit split = new TableSplit(tableName, scan,
+            splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
         splits.add(split);
         if (LOG.isDebugEnabled()) {
           LOG.debug("getSplits: split -> " + i + " -> " + split);
         }
       }
     }
-    //The default value of "hbase.mapreduce.input.autobalance" is false, which means not
enabled.
-    boolean enableAutoBalance = context.getConfiguration().getBoolean(
-      MAPREDUCE_INPUT_AUTOBALANCE, false);
-    if (enableAutoBalance) {
-      long totalRegionSize=0;
-      for (int i = 0; i < splits.size(); i++){
-        TableSplit ts = (TableSplit)splits.get(i);
-        totalRegionSize += ts.getLength();
-      }
-      long averageRegionSize = totalRegionSize / splits.size();
-      // the averageRegionSize must be positive.
-      if (averageRegionSize <= 0) {
-        LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
-            "set it to 1.");
-        averageRegionSize = 1;
-      }
-      return calculateRebalancedSplits(splits, context, averageRegionSize);
-    } else {
-      return splits;
-    }
-    } finally {
-      if (closeOnFinish) {
-        closeTable();
-      }
-    }
+    return splits;
   }
 
   /**
-   * @deprecated mistakenly made public in 0.98.7. scope will change to package-private
+   * Create n splits for one InputSplit, For now only support uniform distribution
+   * @param split A TableSplit corresponding to a range of rowkeys
+   * @param n     Number of ranges after splitting.  Pass 1 means no split for the range
+   *              Pass 2 if you want to split the range in two;
+   * @return A list of TableSplit, the size of the list is n
+   * @throws IllegalArgumentIOException
    */
-  @Deprecated
-  public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException
{
-    String hostName = this.reverseDNSCacheMap.get(ipAddress);
-    if (hostName == null) {
-      String ipAddressString = null;
-      try {
-        ipAddressString = DNS.reverseDns(ipAddress, null);
-      } catch (Exception e) {
-        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry
from the
-        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
-        // reverse DNS using jndi doesn't work well with ipv6 addresses.
-        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
+  protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
+      throws IllegalArgumentIOException {
+    if (split == null || !(split instanceof TableSplit)) {
+      throw new IllegalArgumentIOException(
+          "InputSplit for CreateNSplitsPerRegion can not be null + "
+              + "and should be instance of TableSplit");
+    }
+    //if n < 1, then still continue using n = 1
+    n = n < 1 ? 1 : n;
+    List<InputSplit> res = new ArrayList<>(n);
+    if (n == 1) {
+      res.add(split);
+      return res;
+    }
+
+    // Collect Region related information
+    TableSplit ts = (TableSplit) split;
+    TableName tableName = ts.getTable();
+    String regionLocation = ts.getRegionLocation();
+    String encodedRegionName = ts.getEncodedRegionName();
+    long regionSize = ts.getLength();
+    byte[] startRow = ts.getStartRow();
+    byte[] endRow = ts.getEndRow();
+
+    // For special case: startRow or endRow is empty
+    if (startRow.length == 0 && endRow.length == 0){
+      startRow = new byte[1];
+      endRow = new byte[1];
+      startRow[0] = 0;
+      endRow[0] = -1;
+    }
+    if (startRow.length == 0 && endRow.length != 0){
+      startRow = new byte[1];
+      startRow[0] = 0;
+    }
+    if (startRow.length != 0 && endRow.length == 0){
+      endRow =new byte[startRow.length];
+      for (int k = 0; k < startRow.length; k++){
+        endRow[k] = -1;
       }
-      if (ipAddressString == null) throw new UnknownHostException("No host found for " +
ipAddress);
-      hostName = Strings.domainNamePointerToHostName(ipAddressString);
-      this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
-    return hostName;
-  }
 
+    // Split Region into n chunks evenly
+    byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
+    for (int i = 0; i < splitKeys.length - 1; i++) {
+      //notice that the regionSize parameter may be not very accurate
+      TableSplit tsplit =
+          new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
+              encodedRegionName, regionSize / n);
+      res.add(tsplit);
+    }
+    return res;
+  }
   /**
    * Calculates the number of MapReduce input splits for the map tasks. The number of
-   * MapReduce input splits depends on the average region size and the "data skew ratio"
user set in
-   * configuration.
+   * MapReduce input splits depends on the average region size.
+   * Make it 'public' for testing
    *
-   * @param list  The list of input splits before balance.
-   * @param context  The current job context.
-   * @param average  The average size of all regions .
+   * @param splits The list of input splits before balance.
+   * @param maxAverageRegionSize max Average region size for one mapper
    * @return The list of input splits.
    * @throws IOException When creating the list of splits fails.
    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
-   *   org.apache.hadoop.mapreduce.JobContext)
+   *org.apache.hadoop.mapreduce.JobContext)
    */
-  public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext
context,
-                                               long average) throws IOException {
-    List<InputSplit> resultList = new ArrayList<InputSplit>();
-    Configuration conf = context.getConfiguration();
-    //The default data skew ratio is 3
-    long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
-    //It determines which mode to use: text key mode or binary key mode. The default is text
mode.
-    boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
-    long dataSkewThreshold = dataSkewRatio * average;
-    int count = 0;
-    while (count < list.size()) {
-      TableSplit ts = (TableSplit)list.get(count);
+  public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits,
long maxAverageRegionSize)
+      throws IOException {
+    if (splits.size() == 0) {
+      return splits;
+    }
+    List<InputSplit> resultList = new ArrayList<>();
+    long totalRegionSize = 0;
+    for (int i = 0; i < splits.size(); i++) {
+      TableSplit ts = (TableSplit) splits.get(i);
+      totalRegionSize += ts.getLength();
+    }
+    long averageRegionSize = totalRegionSize / splits.size();
+    // totalRegionSize might be overflow, and the averageRegionSize must be positive.
+    if (averageRegionSize <= 0) {
+      LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
+          "set it to Long.MAX_VALUE " + splits.size());
+      averageRegionSize = Long.MAX_VALUE / splits.size();
+    }
+    //if averageRegionSize is too big, change it to default as 8 GB,
+    if (averageRegionSize > maxAverageRegionSize) {
+      averageRegionSize = maxAverageRegionSize;
+    }
+    // if averageRegionSize is too small, we do not need to allocate more mappers for those
'large' region
+    // set default as 64M = (default hdfs block size);
+    if (averageRegionSize < 64 * 1048576) {
+      return splits;
+    }
+    for (int i = 0; i < splits.size(); i++) {
+      TableSplit ts = (TableSplit) splits.get(i);
       TableName tableName = ts.getTable();
       String regionLocation = ts.getRegionLocation();
       String encodedRegionName = ts.getEncodedRegionName();
       long regionSize = ts.getLength();
-      if (regionSize >= dataSkewThreshold) {
-        // if the current region size is large than the data skew threshold,
-        // split the region into two MapReduce input splits.
-        byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
-        if (Arrays.equals(ts.getEndRow(), splitKey)) {
-          // Not splitting since the end key is the same as the split key
-          resultList.add(ts);
-        } else {
-          //Set the size of child TableSplit as 1/2 of the region size. The exact size of
the
-          // MapReduce input splits is not far off.
-          TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
-              regionLocation, regionSize / 2);
-          TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
-              regionSize - regionSize / 2);
-          resultList.add(t1);
-          resultList.add(t2);
-        }
-        count++;
-      } else if (regionSize >= average) {
-        // if the region size between average size and data skew threshold size,
-        // make this region as one MapReduce input split.
-        resultList.add(ts);
-        count++;
+
+      if (regionSize >= averageRegionSize) {
+        // make this region as multiple MapReduce input split.
+        int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize))
+ 1.0);
+        List<InputSplit> temp = createNInputSplitsUniform(ts, n);
+        resultList.addAll(temp);
       } else {
         // if the total size of several small continuous regions less than the average region
size,
         // combine them into one MapReduce input split.
         long totalSize = regionSize;
         byte[] splitStartKey = ts.getStartRow();
         byte[] splitEndKey = ts.getEndRow();
-        count++;
-        for (; count < list.size(); count++) {
-          TableSplit nextRegion = (TableSplit)list.get(count);
+        int j = i + 1;
+        while (j < splits.size()) {
+          TableSplit nextRegion = (TableSplit) splits.get(j);
           long nextRegionSize = nextRegion.getLength();
-          if (totalSize + nextRegionSize <= dataSkewThreshold) {
+          if (totalSize + nextRegionSize <= averageRegionSize) {
             totalSize = totalSize + nextRegionSize;
             splitEndKey = nextRegion.getEndRow();
+            j++;
           } else {
             break;
           }
         }
-        TableSplit t = new TableSplit(table.getName(), scan, splitStartKey, splitEndKey,
-                regionLocation, encodedRegionName, totalSize);
+        i = j - 1;
+        TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
+            encodedRegionName, totalSize);
         resultList.add(t);
       }
     }
@@ -446,81 +507,26 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
-   * select a split point in the region. The selection of the split point is based on an
uniform
-   * distribution assumption for the keys in a region.
-   * Here are some examples:
-   *
-   * <table>
-   *   <tr>
-   *     <th>start key</th>
-   *     <th>end key</th>
-   *     <th>is text</th>
-   *     <th>split point</th>
-   *   </tr>
-   *   <tr>
-   *     <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
-   *     <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
-   *     <td>true</td>
-   *     <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0', '0', '0'</td>
-   *     <td>'1', '1', '2', '5', '7', '9', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -78, -77, -76, -104</td>
-   *   </tr>
-   *   <tr>
-   *     <td>'1', '1', '1', '0'</td>
-   *     <td>'1', '1', '2', '0'</td>
-   *     <td>true</td>
-   *     <td>'1', '1', '1', -80</td>
-   *   </tr>
-   *   <tr>
-   *     <td>13, -19, 126, 127</td>
-   *     <td>13, -19, 127, 0</td>
-   *     <td>false</td>
-   *     <td>13, -19, 126, -65</td>
-   *   </tr>
-   * </table>
-   *
-   * Set this function as "public static", make it easier for test.
-   *
-   * @param start Start key of the region
-   * @param end End key of the region
-   * @param isText It determines to use text key mode or binary key mode
-   * @return The split point in the region.
+   * @deprecated mistakenly made public in 0.98.7. scope will change to package-private
    */
-  public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
-    byte upperLimitByte;
-    byte lowerLimitByte;
-    //Use text mode or binary mode.
-    if (isText) {
-      //The range of text char set in ASCII is [32,126], the lower limit is space and the
upper
-      // limit is '~'.
-      upperLimitByte = '~';
-      lowerLimitByte = ' ';
-    } else {
-      upperLimitByte = -1;
-      lowerLimitByte = 0;
-    }
-    // For special case
-    // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
-    // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
-    if (start.length == 0 && end.length == 0){
-      return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
-    }
-    if (start.length == 0 && end.length != 0){
-      return new byte[]{ end[0] };
-    }
-    if (start.length != 0 && end.length == 0){
-      byte[] result =new byte[start.length];
-      result[0]=start[0];
-      for (int k = 1; k < start.length; k++){
-          result[k] = upperLimitByte;
+  @Deprecated
+  public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException
{
+    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    if (hostName == null) {
+      String ipAddressString = null;
+      try {
+        ipAddressString = DNS.reverseDns(ipAddress, null);
+      } catch (Exception e) {
+        // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry
from the
+        // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
+        // reverse DNS using jndi doesn't work well with ipv6 addresses.
+        ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
       }
-      return result;
+      if (ipAddressString == null) throw new UnknownHostException("No host found for " +
ipAddress);
+      hostName = Strings.domainNamePointerToHostName(ipAddressString);
+      this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
-    return Bytes.split(start, end, false, 1)[1];
+    return hostName;
   }
 
   /**
@@ -532,7 +538,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * This optimization is effective when there is a specific reasoning to exclude an entire
region from the M-R job,
    * (and hence, not contributing to the InputSplit), given the start and end keys of the
same. <br>
    * Useful when we need to remember the last-processed top record and revisit the [last,
current) interval for M-R processing,
-   * continuously. In addition to reducing InputSplits, reduces the load on the region server
as 
+   * continuously. In addition to reducing InputSplits, reduces the load on the region server
as
    * well, due to the ordering of the keys.
    * <br>
    * <br>
@@ -570,7 +576,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
     return regionLocator;
   }
-  
+
   /**
    * Allows subclasses to get the {@link Table}.
    */
@@ -598,7 +604,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * retreiving an Admin interface to the HBase cluster.
    *
    * @param table  The table to get the data from.
-   * @throws IOException 
+   * @throws IOException
    * @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
    */
   @Deprecated
@@ -629,8 +635,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * Allows subclasses to initialize the table information.
    *
    * @param connection  The {@link Connection} to the HBase cluster. MUST be unmanaged. We
will close.
-   * @param tableName  The {@link TableName} of the table to process. 
-   * @throws IOException 
+   * @param tableName  The {@link TableName} of the table to process.
+   * @throws IOException
    */
   protected void initializeTable(Connection connection, TableName tableName) throws IOException
{
     if (this.table != null || this.connection != null) {
@@ -671,7 +677,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
     this.tableRecordReader = tableRecordReader;
   }
-  
+
   /**
    * Handle subclass specific set up.
    * Each of the entry points used by the MapReduce framework,

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbbcb2db/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
index 325e4c4..d16456b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
@@ -100,12 +100,7 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase
{
   }
 
   /**
-   * Tests a MR scan using specific number of mappers. The test table has 25 regions,
-   * and all region sizes are set as 0 as default. The average region size is 1 (the smallest
-   * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into
two
-   * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
-   * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average
region
-   * size, all regions will be combined into 1 MapRedcue input split.
+   * Tests a MR scan using specific number of mappers. The test table has 26 regions,
    *
    * @throws IOException
    * @throws ClassNotFoundException
@@ -113,93 +108,29 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase
{
    */
   @Test
   public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException
{
-    HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
-    List<HRegionLocation> locs = table.getRegionLocator().getAllRegionLocations();
-
-    testNumOfSplits("-1", locs.size()*2);
-    table.close();
-    testNumOfSplits("100", 1);
+    testNumOfSplits(1, 26);
+    testNumOfSplits(3, 78);
   }
 
   /**
-   * Tests the getSplitKey() method in TableInputFormatBase.java
-   *
+   * Runs a MR to test TIF using specific number of mappers. The test table has 26 regions,
+   * @throws InterruptedException
    * @throws IOException
    * @throws ClassNotFoundException
-   * @throws InterruptedException
    */
   @Test
-  public void testGetSplitsPoint() throws IOException, InterruptedException,
-  ClassNotFoundException {
-    byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
-    byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
-    byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77  };
-    testGetSplitKey(start1, end1, splitPoint1, true);
-
-    byte[] start2 = { '1', '1', '1', '0', '0', '0' };
-    byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
-    byte[] splitPoint2 = { '1', '1', '1',  -78, -77, -76, -104 };
-    testGetSplitKey(start2, end2, splitPoint2, true);
-
-    byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
-    byte[] end3 = { 'a', 'a', 'b' };
-    byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 };
-    testGetSplitKey(start3, end3, splitPoint3, true);
-
-    byte[] start4 = { 'a', 'a', 'a' };
-    byte[] end4 = { 'a', 'a', 'a', 'z' };
-    byte[] splitPoint4 = { 'a', 'a', 'a', '=' };
-    testGetSplitKey(start4, end4, splitPoint4, true);
-
-    byte[] start5 = { 'a', 'a', 'a' };
-    byte[] end5 = { 'a', 'a', 'b', 'a' };
-    byte[] splitPoint5 = { 'a', 'a', 'a', -80 };
-    testGetSplitKey(start5, end5, splitPoint5, true);
-
-    // Test Case 6: empty key and "hhhqqqwww", split point is "h"
-    byte[] start6 = {};
-    byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
-    byte[] splitPointText6 = { 'h' };
-    byte[] splitPointBinary6 = { 104 };
-    testGetSplitKey(start6, end6, splitPointText6, true);
-    testGetSplitKey(start6, end6, splitPointBinary6, false);
-
-    // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text
key or
-    // binary key).
-    byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
-    byte[] end7 = {};
-    byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~'  };
-    byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1  };
-    testGetSplitKey(start7, end7, splitPointText7, true);
-    testGetSplitKey(start7, end7, splitPointBinary7, false);
-
-    // Test Case 8: both start key and end key are empty. Split point depends on the mode
we
-    // choose (text key or binary key).
-    byte[] start8 = {};
-    byte[] end8 = {};
-    byte[] splitPointText8 = { 'O' };
-    byte[] splitPointBinary8 = { 0 };
-    testGetSplitKey(start8, end8, splitPointText8, true);
-    testGetSplitKey(start8, end8, splitPointBinary8, false);
-
-    // Test Case 9: Binary Key example
-    byte[] start9 = { 13, -19, 126, 127 };
-    byte[] end9 = { 13, -19, 127, 0 };
-    byte[] splitPoint9 = { 13, -19, 126, -65 };
-    testGetSplitKey(start9, end9, splitPoint9, false);
-
-    // Test Case 10: Binary key split when the start key is an unsigned byte and the end
byte is a
-    // signed byte
-    byte[] start10 = { 'x' };
-    byte[] end10 = { -128 };
-    byte[] splitPoint10 = { '|' };
-    testGetSplitKey(start10, end10, splitPoint10, false);
+  public void testSpecifiedNumOfMappersMR()
+      throws InterruptedException, IOException, ClassNotFoundException {
+    testNumOfSplitsMR(2, 52);
+    testNumOfSplitsMR(4, 104);
+  }
 
-    // Test Case 11: Binary key split when the start key is an signed byte and the end byte
is a
-    // signed byte
-    byte[] start11 = { -100 };
-    byte[] end11 = { -90 };
-    byte[] splitPoint11 = { -95 };
-    testGetSplitKey(start11, end11, splitPoint11, false);
+  /**
+   * Test if autoBalance create correct splits
+   * @throws IOException
+   */
+  @Test
+  public void testAutoBalanceSplits() throws IOException {
+    testAutobalanceNumOfSplit();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbbcb2db/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
index 6521d2d..b0ae937 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -19,13 +19,11 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.NavigableMap;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -66,6 +67,7 @@ public abstract class TestTableInputFormatScanBase {
 
   static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
   static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
   static final String KEY_STARTROW = "startRow";
   static final String KEY_LASTROW = "stpRow";
 
@@ -245,40 +247,86 @@ public abstract class TestTableInputFormatScanBase {
 
 
   /**
-   * Tests a MR scan using data skew auto-balance
+   * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
+   * This test does not run MR job
    *
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
    */
-  public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
-          InterruptedException,
-          ClassNotFoundException {
+  public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
+      InterruptedException,
+      ClassNotFoundException {
     String jobName = "TestJobForNumOfSplits";
     LOG.info("Before map/reduce startup - job " + jobName);
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     Scan scan = new Scan();
     scan.addFamily(INPUT_FAMILY);
-    c.set("hbase.mapreduce.input.autobalance", "true");
-    c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
+    c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion);
     c.set(KEY_STARTROW, "");
     c.set(KEY_LASTROW, "");
     Job job = new Job(c, jobName);
-    TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
-            ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    TableMapReduceUtil.initTableMapperJob(TableName.valueOf(TABLE_NAME).getNameAsString(),
scan, ScanMapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
     TableInputFormat tif = new TableInputFormat();
     tif.setConf(job.getConfiguration());
-    Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
     List<InputSplit> splits = tif.getSplits(job);
     Assert.assertEquals(expectedNumOfSplits, splits.size());
   }
 
   /**
-   * Tests for the getSplitKey() method in TableInputFormatBase.java
+   * Run MR job to check the number of mapper = expectedNumOfSplits
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
    */
-  public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText)
{
-    byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
-      Assert.assertArrayEquals(splitKey, result);
+  public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
+      InterruptedException,
+      ClassNotFoundException {
+    String jobName = "TestJobForNumOfSplits-MR";
+    LOG.info("Before map/reduce startup - job " + jobName);
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion);
+    Job job = new Job(c, jobName);
+    TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
+        ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    job.setReducerClass(ScanReducer.class);
+    job.setNumReduceTasks(1);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    assertTrue("job failed!", job.waitForCompletion(true));
+    // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
+    // we use TaskCounter.SHUFFLED_MAPS to get total launched maps
+    assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
+        job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
+  }
+
+  /**
+   * Run MR job to test autobalance for setting number of mappers for TIF
+   * This does not run real MR job
+   */
+  public void testAutobalanceNumOfSplit() throws IOException {
+    // set up splits for testing
+    List<InputSplit> splits = new ArrayList<>(5);
+    int[] regionLen = {100, 200, 200, 400, 600};
+    for (int i = 0; i < 5; i++) {
+      InputSplit split = new TableSplit(TableName.valueOf(TABLE_NAME), new Scan(),
+          Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
+      splits.add(split);
+    }
+    TableInputFormat tif = new TableInputFormat();
+    List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
+
+    assertEquals("Saw the wrong number of splits", 5, res.size());
+    TableSplit ts1 = (TableSplit) res.get(0);
+    assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
+    TableSplit ts2 = (TableSplit) res.get(1);
+    assertEquals("The second split regionsize should be", 200 * 1048576, ts2.getLength());
+    TableSplit ts3 = (TableSplit) res.get(2);
+    assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
+    TableSplit ts4 = (TableSplit) res.get(4);
+    assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cbbcb2db/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
index f5efa34..3af2ca9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
@@ -392,9 +392,7 @@ public class TestNamespaceAuditor {
     assertEquals(initialRegions, hris.size());
     Collections.sort(hris);
     // verify that we cannot split
-    HRegionInfo hriToSplit2 = hris.get(1);
-    ADMIN.split(tableTwo,
-      TableInputFormatBase.getSplitKey(hriToSplit2.getStartKey(), hriToSplit2.getEndKey(),
true));
+    ADMIN.split(tableTwo, Bytes.toBytes("6"));
     waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge);
     assertEquals(initialRegions, ADMIN.getTableRegions(tableTwo).size());
   }


Mime
View raw message