Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51FB4200D26 for ; Fri, 6 Oct 2017 06:14:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 506E2160BDE; Fri, 6 Oct 2017 04:14:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 885C0160BDB for ; Fri, 6 Oct 2017 06:14:53 +0200 (CEST) Received: (qmail 16956 invoked by uid 500); 6 Oct 2017 04:14:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 12097 invoked by uid 99); 6 Oct 2017 04:14:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Oct 2017 04:14:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B417F5D01; Fri, 6 Oct 2017 04:14:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@hbase.apache.org Date: Fri, 06 Oct 2017 04:15:31 -0000 Message-Id: <2170f300ea2a46d982eb1a7df4482fb7@git.apache.org> In-Reply-To: <3973999eb3d4471fb3e72950ef9c8675@git.apache.org> References: <3973999eb3d4471fb3e72950ef9c8675@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [52/60] [abbrv] hbase git commit: HBASE-16894 Create more than 1 split per region, generalize HBASE-12590 archived-at: Fri, 06 Oct 2017 04:14:55 -0000 HBASE-16894 Create more than 1 split per region, generalize HBASE-12590 Signed-off-by: Andrew Purtell Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16d483f9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16d483f9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16d483f9 Branch: refs/heads/HBASE-18467 Commit: 16d483f9003ddee71404f37ce7694003d1a18ac4 Parents: 56830c3 Author: Yi Liang Authored: Wed Sep 13 11:38:29 2017 -0700 Committer: Andrew Purtell Committed: Tue Oct 3 17:11:06 2017 -0700 ---------------------------------------------------------------------- .../hbase/mapreduce/TableInputFormatBase.java | 473 ++++++++++--------- .../mapreduce/TestTableInputFormatScan1.java | 103 +--- .../mapreduce/TestTableInputFormatScanBase.java | 83 +++- 3 files changed, 324 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index b3be90b..e7a65e8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -24,13 +24,12 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -93,20 +93,21 @@ import org.apache.hadoop.util.StringUtils; * } * } * + * + * + * The number of InputSplits(mappers) match the number of regions in a table by default. + * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set + * this property will disable autobalance below.\ + * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers + * based on average region size; For regions, whose size larger than average region size may assigned + * more mappers, and for smaller one, they may group together to use one mapper. If actual average + * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. + * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", + * default mas average region size is 8G. */ @InterfaceAudience.Public public abstract class TableInputFormatBase -extends InputFormat { - - /** Specify if we enable auto-balance for input in M/R jobs.*/ - public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; - /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce - * .input.autobalance property.*/ - public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + - ".maxskewratio"; - /** Specify if the row key in table is text (ASCII between 32~126), - * default is true. False means the table is using binary row key*/ - public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; + extends InputFormat { private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); @@ -114,8 +115,17 @@ extends InputFormat { "initialized. Ensure you call initializeTable either in your constructor or initialize " + "method"; private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."; + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + + /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ + public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; + /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ + public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; + + /** Set the number of Mappers for each region, all regions have same number of Mappers */ + public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region"; + /** Holds the details for the internal scanner. * @@ -134,7 +144,8 @@ extends InputFormat { /** The reverse DNS lookup cache mapping: IPAddress => HostName */ - private HashMap reverseDNSCacheMap = new HashMap<>(); + private HashMap reverseDNSCacheMap = + new HashMap<>(); /** * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses @@ -151,7 +162,7 @@ extends InputFormat { @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) - throws IOException { + throws IOException { // Just in case a subclass is relying on JobConfigurable magic. if (table == null) { initialize(context); @@ -215,9 +226,7 @@ extends InputFormat { } /** - * Calculates the splits that will serve as input for the map tasks. The - * number of splits matches the number of regions in a table. - * + * Calculates the splits that will serve as input for the map tasks. * @param context The current job context. * @return The list of input splits. * @throws IOException When creating the list of splits fails. @@ -245,269 +254,263 @@ extends InputFormat { } try { - RegionSizeCalculator sizeCalculator = - new RegionSizeCalculator(getRegionLocator(), getAdmin()); - - TableName tableName = getTable().getName(); - - Pair keys = getStartEndKeys(); - if (keys == null || keys.getFirst() == null || - keys.getFirst().length == 0) { - HRegionLocation regLoc = - getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - if (null == regLoc) { - throw new IOException("Expecting at least one region."); + List splits = oneInputSplitPerRegion(); + + // set same number of mappers for each region + if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { + int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); + List res = new ArrayList<>(); + for (int i = 0; i < splits.size(); i++) { + List tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); + res.addAll(tmp); } - List splits = new ArrayList<>(1); - long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - TableSplit split = new TableSplit(tableName, scan, - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); - splits.add(split); - return splits; + return res; + } + + //The default value of "hbase.mapreduce.input.autobalance" is false. + if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false) { + long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE, 8*1073741824); + return calculateAutoBalancedSplits(splits, maxAveRegionSize); } - List splits = new ArrayList<>(keys.getFirst().length); - for (int i = 0; i < keys.getFirst().length; i++) { - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || + // return one mapper per region + return splits; + } finally { + if (closeOnFinish) { + closeTable(); + } + } + } + + /** + * Create one InputSplit per region + * + * @return The list of InputSplit for all the regions + * @throws IOException + */ + private List oneInputSplitPerRegion() throws IOException { + RegionSizeCalculator sizeCalculator = + new RegionSizeCalculator(getRegionLocator(), getAdmin()); + + TableName tableName = getTable().getName(); + + Pair keys = getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + HRegionLocation regLoc = + getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List splits = new ArrayList<>(1); + long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); + TableSplit split = new TableSplit(tableName, scan, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); + splits.add(split); + return splits; + } + List splits = new ArrayList<>(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; + keys.getSecond()[i] : stopRow; - HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); - // The below InetSocketAddress creation does a name resolution. - InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); - if (isa.isUnresolved()) { - LOG.warn("Failed resolve " + isa); - } - InetAddress regionAddress = isa.getAddress(); - String regionLocation; - regionLocation = reverseDNS(regionAddress); - - byte[] regionName = location.getRegionInfo().getRegionName(); - String encodedRegionName = location.getRegionInfo().getEncodedName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(tableName, scan, - splitStart, splitStop, regionLocation, encodedRegionName, regionSize); - splits.add(split); - if (LOG.isDebugEnabled()) { - LOG.debug("getSplits: split -> " + i + " -> " + split); - } - } - } - //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. - boolean enableAutoBalance = context.getConfiguration() - .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false); - if (enableAutoBalance) { - long totalRegionSize=0; - for (int i = 0; i < splits.size(); i++){ - TableSplit ts = (TableSplit)splits.get(i); - totalRegionSize += ts.getLength(); + HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); } - long averageRegionSize = totalRegionSize / splits.size(); - // the averageRegionSize must be positive. - if (averageRegionSize <= 0) { - LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + - "set it to 1."); - averageRegionSize = 1; + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + regionLocation = reverseDNS(regionAddress); + + byte[] regionName = location.getRegionInfo().getRegionName(); + String encodedRegionName = location.getRegionInfo().getEncodedName(); + long regionSize = sizeCalculator.getRegionSize(regionName); + TableSplit split = new TableSplit(tableName, scan, + splitStart, splitStop, regionLocation, encodedRegionName, regionSize); + splits.add(split); + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + i + " -> " + split); } - return calculateRebalancedSplits(splits, context, averageRegionSize); - } else { - return splits; - } - } finally { - if (closeOnFinish) { - closeTable(); } } + return splits; } - String reverseDNS(InetAddress ipAddress) throws UnknownHostException { - String hostName = this.reverseDNSCacheMap.get(ipAddress); - if (hostName == null) { - String ipAddressString = null; - try { - ipAddressString = DNS.reverseDns(ipAddress, null); - } catch (Exception e) { - // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the - // name service. Also, in case of ipv6, we need to use the InetAddress since resolving - // reverse DNS using jndi doesn't work well with ipv6 addresses. - ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); + /** + * Create n splits for one InputSplit, For now only support uniform distribution + * @param split A TableSplit corresponding to a range of rowkeys + * @param n Number of ranges after splitting. Pass 1 means no split for the range + * Pass 2 if you want to split the range in two; + * @return A list of TableSplit, the size of the list is n + * @throws IllegalArgumentIOException + */ + protected List createNInputSplitsUniform(InputSplit split, int n) + throws IllegalArgumentIOException { + if (split == null || !(split instanceof TableSplit)) { + throw new IllegalArgumentIOException( + "InputSplit for CreateNSplitsPerRegion can not be null + " + + "and should be instance of TableSplit"); + } + //if n < 1, then still continue using n = 1 + n = n < 1 ? 1 : n; + List res = new ArrayList<>(n); + if (n == 1) { + res.add(split); + return res; + } + + // Collect Region related information + TableSplit ts = (TableSplit) split; + TableName tableName = ts.getTable(); + String regionLocation = ts.getRegionLocation(); + String encodedRegionName = ts.getEncodedRegionName(); + long regionSize = ts.getLength(); + byte[] startRow = ts.getStartRow(); + byte[] endRow = ts.getEndRow(); + + // For special case: startRow or endRow is empty + if (startRow.length == 0 && endRow.length == 0){ + startRow = new byte[1]; + endRow = new byte[1]; + startRow[0] = 0; + endRow[0] = -1; + } + if (startRow.length == 0 && endRow.length != 0){ + startRow = new byte[1]; + startRow[0] = 0; + } + if (startRow.length != 0 && endRow.length == 0){ + endRow =new byte[startRow.length]; + for (int k = 0; k < startRow.length; k++){ + endRow[k] = -1; } - if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); - hostName = Strings.domainNamePointerToHostName(ipAddressString); - this.reverseDNSCacheMap.put(ipAddress, hostName); } - return hostName; - } + // Split Region into n chunks evenly + byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); + for (int i = 0; i < splitKeys.length - 1; i++) { + //notice that the regionSize parameter may be not very accurate + TableSplit tsplit = + new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, + encodedRegionName, regionSize / n); + res.add(tsplit); + } + return res; + } /** * Calculates the number of MapReduce input splits for the map tasks. The number of - * MapReduce input splits depends on the average region size and the "data skew ratio" user set in - * configuration. + * MapReduce input splits depends on the average region size. + * Make it 'public' for testing * - * @param list The list of input splits before balance. - * @param context The current job context. - * @param average The average size of all regions . + * @param splits The list of input splits before balance. + * @param maxAverageRegionSize max Average region size for one mapper * @return The list of input splits. * @throws IOException When creating the list of splits fails. * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( - * org.apache.hadoop.mapreduce.JobContext) + *org.apache.hadoop.mapreduce.JobContext) */ - private List calculateRebalancedSplits(List list, JobContext context, - long average) throws IOException { + public List calculateAutoBalancedSplits(List splits, long maxAverageRegionSize) + throws IOException { + if (splits.size() == 0) { + return splits; + } List resultList = new ArrayList<>(); - Configuration conf = context.getConfiguration(); - //The default data skew ratio is 3 - long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); - //It determines which mode to use: text key mode or binary key mode. The default is text mode. - boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); - long dataSkewThreshold = dataSkewRatio * average; - int count = 0; - while (count < list.size()) { - TableSplit ts = (TableSplit)list.get(count); + long totalRegionSize = 0; + for (int i = 0; i < splits.size(); i++) { + TableSplit ts = (TableSplit) splits.get(i); + totalRegionSize += ts.getLength(); + } + long averageRegionSize = totalRegionSize / splits.size(); + // totalRegionSize might be overflow, and the averageRegionSize must be positive. + if (averageRegionSize <= 0) { + LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + + "set it to Long.MAX_VALUE " + splits.size()); + averageRegionSize = Long.MAX_VALUE / splits.size(); + } + //if averageRegionSize is too big, change it to default as 1 GB, + if (averageRegionSize > maxAverageRegionSize) { + averageRegionSize = maxAverageRegionSize; + } + // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region + // set default as 16M = (default hdfs block size) / 4; + if (averageRegionSize < 16 * 1048576) { + return splits; + } + for (int i = 0; i < splits.size(); i++) { + TableSplit ts = (TableSplit) splits.get(i); TableName tableName = ts.getTable(); String regionLocation = ts.getRegionLocation(); String encodedRegionName = ts.getEncodedRegionName(); long regionSize = ts.getLength(); - if (regionSize >= dataSkewThreshold) { - // if the current region size is large than the data skew threshold, - // split the region into two MapReduce input splits. - byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); - if (Arrays.equals(ts.getEndRow(), splitKey)) { - // Not splitting since the end key is the same as the split key - resultList.add(ts); - } else { - //Set the size of child TableSplit as 1/2 of the region size. The exact size of the - // MapReduce input splits is not far off. - TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, - regionLocation, regionSize / 2); - TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, - regionSize - regionSize / 2); - resultList.add(t1); - resultList.add(t2); - } - count++; - } else if (regionSize >= average) { - // if the region size between average size and data skew threshold size, - // make this region as one MapReduce input split. - resultList.add(ts); - count++; + + if (regionSize >= averageRegionSize) { + // make this region as multiple MapReduce input split. + int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); + List temp = createNInputSplitsUniform(ts, n); + resultList.addAll(temp); } else { // if the total size of several small continuous regions less than the average region size, // combine them into one MapReduce input split. long totalSize = regionSize; byte[] splitStartKey = ts.getStartRow(); byte[] splitEndKey = ts.getEndRow(); - count++; - for (; count < list.size(); count++) { - TableSplit nextRegion = (TableSplit)list.get(count); + int j = i + 1; + while (j < splits.size()) { + TableSplit nextRegion = (TableSplit) splits.get(j); long nextRegionSize = nextRegion.getLength(); - if (totalSize + nextRegionSize <= dataSkewThreshold) { + if (totalSize + nextRegionSize <= averageRegionSize) { totalSize = totalSize + nextRegionSize; splitEndKey = nextRegion.getEndRow(); + j++; } else { break; } } - TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, - regionLocation, encodedRegionName, totalSize); + i = j - 1; + TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, + encodedRegionName, totalSize); resultList.add(t); } } return resultList; } - /** - * select a split point in the region. The selection of the split point is based on an uniform - * distribution assumption for the keys in a region. - * Here are some examples: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
start keyend keyis textsplit point
'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g''a', 'a', 'a', 'f', 'f', 'f'true'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51
'1', '1', '1', '0', '0', '0''1', '1', '2', '5', '7', '9', '0'true'1', '1', '1', -78, -77, -76, -104
'1', '1', '1', '0''1', '1', '2', '0'true'1', '1', '1', -80
13, -19, 126, 12713, -19, 127, 0false13, -19, 126, -65
- * - * Set this function as "public static", make it easier for test. - * - * @param start Start key of the region - * @param end End key of the region - * @param isText It determines to use text key mode or binary key mode - * @return The split point in the region. - */ - @InterfaceAudience.Private - public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { - byte upperLimitByte; - byte lowerLimitByte; - //Use text mode or binary mode. - if (isText) { - //The range of text char set in ASCII is [32,126], the lower limit is space and the upper - // limit is '~'. - upperLimitByte = '~'; - lowerLimitByte = ' '; - } else { - upperLimitByte = -1; - lowerLimitByte = 0; - } - // For special case - // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" - // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" - if (start.length == 0 && end.length == 0){ - return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; - } - if (start.length == 0 && end.length != 0){ - return new byte[]{ end[0] }; - } - if (start.length != 0 && end.length == 0){ - byte[] result =new byte[start.length]; - result[0]=start[0]; - for (int k = 1; k < start.length; k++){ - result[k] = upperLimitByte; + String reverseDNS(InetAddress ipAddress) throws UnknownHostException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + String ipAddressString = null; + try { + ipAddressString = DNS.reverseDns(ipAddress, null); + } catch (Exception e) { + // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the + // name service. Also, in case of ipv6, we need to use the InetAddress since resolving + // reverse DNS using jndi doesn't work well with ipv6 addresses. + ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); } - return result; + if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); + hostName = Strings.domainNamePointerToHostName(ipAddressString); + this.reverseDNSCacheMap.put(ipAddress, hostName); } - return Bytes.split(start, end, false, 1)[1]; + return hostName; } /** @@ -649,4 +652,4 @@ extends InputFormat { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java index 99b40b9..553869e 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -98,103 +98,38 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { } /** - * Tests a MR scan using specific number of mappers. The test table has 25 regions, - * and all region sizes are set as 0 as default. The average region size is 1 (the smallest - * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two - * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase - * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region - * size, all regions will be combined into 1 MapRedcue input split. + * Tests a MR scan using specific number of mappers. The test table has 26 regions, * * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ @Test - public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { - testNumOfSplits("-1", 52); - testNumOfSplits("100", 1); - } + public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { + testNumOfSplits(1, 26); + testNumOfSplits(3, 78); + } /** - * Tests the getSplitKey() method in TableInputFormatBase.java - * + * Runs a MR to test TIF using specific number of mappers. The test table has 26 regions, + * @throws InterruptedException * @throws IOException * @throws ClassNotFoundException - * @throws InterruptedException */ @Test - public void testGetSplitsPoint() throws IOException, InterruptedException, - ClassNotFoundException { - byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' }; - byte[] end1 = { 'a', 'a', 'a', 'f', 'f' }; - byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 }; - testGetSplitKey(start1, end1, splitPoint1, true); - - byte[] start2 = { '1', '1', '1', '0', '0', '0' }; - byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' }; - byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 }; - testGetSplitKey(start2, end2, splitPoint2, true); - - byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' }; - byte[] end3 = { 'a', 'a', 'b' }; - byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 }; - testGetSplitKey(start3, end3, splitPoint3, true); - - byte[] start4 = { 'a', 'a', 'a' }; - byte[] end4 = { 'a', 'a', 'a', 'z' }; - byte[] splitPoint4 = { 'a', 'a', 'a', '=' }; - testGetSplitKey(start4, end4, splitPoint4, true); - - byte[] start5 = { 'a', 'a', 'a' }; - byte[] end5 = { 'a', 'a', 'b', 'a' }; - byte[] splitPoint5 = { 'a', 'a', 'a', -80 }; - testGetSplitKey(start5, end5, splitPoint5, true); - - // Test Case 6: empty key and "hhhqqqwww", split point is "h" - byte[] start6 = {}; - byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' }; - byte[] splitPointText6 = { 'h' }; - byte[] splitPointBinary6 = { 104 }; - testGetSplitKey(start6, end6, splitPointText6, true); - testGetSplitKey(start6, end6, splitPointBinary6, false); - - // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or - // binary key). - byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' }; - byte[] end7 = {}; - byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' }; - byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 }; - testGetSplitKey(start7, end7, splitPointText7, true); - testGetSplitKey(start7, end7, splitPointBinary7, false); - - // Test Case 8: both start key and end key are empty. Split point depends on the mode we - // choose (text key or binary key). - byte[] start8 = {}; - byte[] end8 = {}; - byte[] splitPointText8 = { 'O' }; - byte[] splitPointBinary8 = { 0 }; - testGetSplitKey(start8, end8, splitPointText8, true); - testGetSplitKey(start8, end8, splitPointBinary8, false); - - // Test Case 9: Binary Key example - byte[] start9 = { 13, -19, 126, 127 }; - byte[] end9 = { 13, -19, 127, 0 }; - byte[] splitPoint9 = { 13, -19, 126, -65 }; - testGetSplitKey(start9, end9, splitPoint9, false); - - // Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a - // signed byte - byte[] start10 = { 'x' }; - byte[] end10 = { -128 }; - byte[] splitPoint10 = { '|' }; - testGetSplitKey(start10, end10, splitPoint10, false); + public void testSpecifiedNumOfMappersMR() + throws InterruptedException, IOException, ClassNotFoundException { + testNumOfSplitsMR(2, 52); + testNumOfSplitsMR(4, 104); + } - // Test Case 11: Binary key split when the start key is an signed byte and the end byte is a - // signed byte - byte[] start11 = { -100 }; - byte[] end11 = { -90 }; - byte[] splitPoint11 = { -95 }; - testGetSplitKey(start11, end11, splitPoint11, false); + /** + * Test if autoBalance create correct splits + * @throws IOException + */ + @Test + public void testAutoBalanceSplits() throws IOException { + testAutobalanceNumOfSplit(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/16d483f9/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 13b6a96..d127adb 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.NavigableMap; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -134,7 +135,7 @@ public abstract class TestTableInputFormatScanBase { */ public static class ScanReducer extends Reducer { + NullWritable, NullWritable> { private String first = null; private String last = null; @@ -247,28 +248,28 @@ public abstract class TestTableInputFormatScanBase { /** - * Tests a MR scan using data skew auto-balance + * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX + * This test does not run MR job * * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ - public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, - InterruptedException, - ClassNotFoundException { + public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { String jobName = "TestJobForNumOfSplits"; LOG.info("Before map/reduce startup - job " + jobName); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Scan scan = new Scan(); scan.addFamily(INPUT_FAMILYS[0]); scan.addFamily(INPUT_FAMILYS[1]); - c.set("hbase.mapreduce.input.autobalance", "true"); - c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); + c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); c.set(KEY_STARTROW, ""); c.set(KEY_LASTROW, ""); Job job = new Job(c, jobName); TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); TableInputFormat tif = new TableInputFormat(); tif.setConf(job.getConfiguration()); Assert.assertEquals(TABLE_NAME, table.getName()); @@ -277,11 +278,61 @@ public abstract class TestTableInputFormatScanBase { } /** - * Tests for the getSplitKey() method in TableInputFormatBase.java + * Run MR job to check the number of mapper = expectedNumOfSplits + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException */ - public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { - byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); - Assert.assertArrayEquals(splitKey, result); + public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits-MR"; + LOG.info("Before map/reduce startup - job " + jobName); + JobConf c = new JobConf(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILYS[0]); + scan.addFamily(INPUT_FAMILYS[1]); + c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); + c.set(KEY_STARTROW, ""); + c.set(KEY_LASTROW, ""); + Job job = Job.getInstance(c, jobName); + TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + assertTrue("job failed!", job.waitForCompletion(true)); + // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, + // we use TaskCounter.SHUFFLED_MAPS to get total launched maps + assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, + job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); + } + + /** + * Run MR job to test autobalance for setting number of mappers for TIF + * This does not run real MR job + */ + public void testAutobalanceNumOfSplit() throws IOException { + // set up splits for testing + List splits = new ArrayList<>(5); + int[] regionLen = {10, 20, 20, 40, 60}; + for (int i = 0; i < 5; i++) { + InputSplit split = new TableSplit(TABLE_NAME, new Scan(), + Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); + splits.add(split); + } + TableInputFormat tif = new TableInputFormat(); + List res = tif.calculateAutoBalancedSplits(splits, 1073741824); + + assertEquals("Saw the wrong number of splits", 5, res.size()); + TableSplit ts1 = (TableSplit) res.get(0); + assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); + TableSplit ts2 = (TableSplit) res.get(1); + assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); + TableSplit ts3 = (TableSplit) res.get(2); + assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); + TableSplit ts4 = (TableSplit) res.get(4); + assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); } }