Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6F2F7200D37 for ; Wed, 4 Oct 2017 17:13:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6D8F2160BDE; Wed, 4 Oct 2017 15:13:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2CD75160BDC for ; Wed, 4 Oct 2017 17:13:13 +0200 (CEST) Received: (qmail 54188 invoked by uid 500); 4 Oct 2017 15:13:08 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 52892 invoked by uid 99); 4 Oct 2017 15:13:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 15:13:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C4AEF5C80; Wed, 4 Oct 2017 15:13:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 04 Oct 2017 15:13:43 -0000 Message-Id: In-Reply-To: <41f3cb93055741a4b024a6facd72990e@git.apache.org> References: <41f3cb93055741a4b024a6facd72990e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/51] [partial] hbase-site git commit: Published site at . archived-at: Wed, 04 Oct 2017 15:13:15 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/387c1112/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html index 2a28959..898d59a 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html @@ -32,23 +32,23 @@ 024import java.net.InetSocketAddress; 025import java.net.UnknownHostException; 026import java.util.ArrayList; -027import java.util.Arrays; -028import java.util.HashMap; -029import java.util.List; +027import java.util.HashMap; +028import java.util.List; +029import java.util.Map; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; -033import org.apache.hadoop.conf.Configuration; -034import org.apache.yetus.audience.InterfaceAudience; -035import org.apache.hadoop.hbase.HConstants; -036import org.apache.hadoop.hbase.HRegionLocation; -037import org.apache.hadoop.hbase.TableName; -038import org.apache.hadoop.hbase.client.Admin; -039import org.apache.hadoop.hbase.client.Connection; -040import org.apache.hadoop.hbase.client.RegionLocator; -041import org.apache.hadoop.hbase.client.Result; -042import org.apache.hadoop.hbase.client.Scan; -043import org.apache.hadoop.hbase.client.Table; +033import org.apache.yetus.audience.InterfaceAudience; +034import org.apache.hadoop.hbase.HConstants; +035import org.apache.hadoop.hbase.HRegionLocation; +036import org.apache.hadoop.hbase.TableName; +037import org.apache.hadoop.hbase.client.Admin; +038import org.apache.hadoop.hbase.client.Connection; +039import org.apache.hadoop.hbase.client.RegionLocator; +040import org.apache.hadoop.hbase.client.Result; +041import org.apache.hadoop.hbase.client.Scan; +042import org.apache.hadoop.hbase.client.Table; +043import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.util.Addressing; 046import org.apache.hadoop.hbase.util.Bytes; @@ -101,563 +101,566 @@ 093 * } 094 * } 095 * </pre> -096 */ -097@InterfaceAudience.Public -098public abstract class TableInputFormatBase -099extends InputFormat<ImmutableBytesWritable, Result> { -100 -101 /** Specify if we enable auto-balance for input in M/R jobs.*/ -102 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; -103 /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce -104 * .input.autobalance property.*/ -105 public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + -106 ".maxskewratio"; -107 /** Specify if the row key in table is text (ASCII between 32~126), -108 * default is true. False means the table is using binary row key*/ -109 public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; -110 -111 private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); -112 -113 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + -114 "initialized. Ensure you call initializeTable either in your constructor or initialize " + -115 "method"; -116 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + -117 " previous error. Please look at the previous logs lines from" + -118 " the task's full log for more details."; -119 -120 /** Holds the details for the internal scanner. -121 * -122 * @see Scan */ -123 private Scan scan = null; -124 /** The {@link Admin}. */ -125 private Admin admin; -126 /** The {@link Table} to scan. */ -127 private Table table; -128 /** The {@link RegionLocator} of the table. */ -129 private RegionLocator regionLocator; -130 /** The reader scanning the table, can be a custom one. */ -131 private TableRecordReader tableRecordReader = null; -132 /** The underlying {@link Connection} of the table. */ -133 private Connection connection; -134 -135 -136 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ -137 private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); -138 -139 /** -140 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses -141 * the default. -142 * -143 * @param split The split to work with. -144 * @param context The current context. -145 * @return The newly created record reader. -146 * @throws IOException When creating the reader fails. -147 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( -148 * org.apache.hadoop.mapreduce.InputSplit, -149 * org.apache.hadoop.mapreduce.TaskAttemptContext) -150 */ -151 @Override -152 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( -153 InputSplit split, TaskAttemptContext context) -154 throws IOException { -155 // Just in case a subclass is relying on JobConfigurable magic. -156 if (table == null) { -157 initialize(context); -158 } -159 // null check in case our child overrides getTable to not throw. -160 try { -161 if (getTable() == null) { -162 // initialize() must not have been implemented in the subclass. -163 throw new IOException(INITIALIZATION_ERROR); -164 } -165 } catch (IllegalStateException exception) { -166 throw new IOException(INITIALIZATION_ERROR, exception); -167 } -168 TableSplit tSplit = (TableSplit) split; -169 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); -170 final TableRecordReader trr = -171 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); -172 Scan sc = new Scan(this.scan); -173 sc.setStartRow(tSplit.getStartRow()); -174 sc.setStopRow(tSplit.getEndRow()); -175 trr.setScan(sc); -176 trr.setTable(getTable()); -177 return new RecordReader<ImmutableBytesWritable, Result>() { -178 -179 @Override -180 public void close() throws IOException { -181 trr.close(); -182 closeTable(); -183 } -184 -185 @Override -186 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { -187 return trr.getCurrentKey(); -188 } +096 * +097 * +098 * The number of InputSplits(mappers) match the number of regions in a table by default. +099 * Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set +100 * this property will disable autobalance below.\ +101 * Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers +102 * based on average region size; For regions, whose size larger than average region size may assigned +103 * more mappers, and for smaller one, they may group together to use one mapper. If actual average +104 * region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions. +105 * Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece", +106 * default mas average region size is 8G. +107 */ +108@InterfaceAudience.Public +109public abstract class TableInputFormatBase +110 extends InputFormat<ImmutableBytesWritable, Result> { +111 +112 private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); +113 +114 private static final String NOT_INITIALIZED = "The input format instance has not been properly " + +115 "initialized. Ensure you call initializeTable either in your constructor or initialize " + +116 "method"; +117 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + +118 " previous error. Please look at the previous logs lines from" + +119 " the task's full log for more details."; +120 +121 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ +122 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; +123 /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ +124 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; +125 +126 /** Set the number of Mappers for each region, all regions have same number of Mappers */ +127 public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region"; +128 +129 +130 /** Holds the details for the internal scanner. +131 * +132 * @see Scan */ +133 private Scan scan = null; +134 /** The {@link Admin}. */ +135 private Admin admin; +136 /** The {@link Table} to scan. */ +137 private Table table; +138 /** The {@link RegionLocator} of the table. */ +139 private RegionLocator regionLocator; +140 /** The reader scanning the table, can be a custom one. */ +141 private TableRecordReader tableRecordReader = null; +142 /** The underlying {@link Connection} of the table. */ +143 private Connection connection; +144 +145 +146 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ +147 private HashMap<InetAddress, String> reverseDNSCacheMap = +148 new HashMap<>(); +149 +150 /** +151 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses +152 * the default. +153 * +154 * @param split The split to work with. +155 * @param context The current context. +156 * @return The newly created record reader. +157 * @throws IOException When creating the reader fails. +158 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( +159 * org.apache.hadoop.mapreduce.InputSplit, +160 * org.apache.hadoop.mapreduce.TaskAttemptContext) +161 */ +162 @Override +163 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( +164 InputSplit split, TaskAttemptContext context) +165 throws IOException { +166 // Just in case a subclass is relying on JobConfigurable magic. +167 if (table == null) { +168 initialize(context); +169 } +170 // null check in case our child overrides getTable to not throw. +171 try { +172 if (getTable() == null) { +173 // initialize() must not have been implemented in the subclass. +174 throw new IOException(INITIALIZATION_ERROR); +175 } +176 } catch (IllegalStateException exception) { +177 throw new IOException(INITIALIZATION_ERROR, exception); +178 } +179 TableSplit tSplit = (TableSplit) split; +180 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); +181 final TableRecordReader trr = +182 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); +183 Scan sc = new Scan(this.scan); +184 sc.setStartRow(tSplit.getStartRow()); +185 sc.setStopRow(tSplit.getEndRow()); +186 trr.setScan(sc); +187 trr.setTable(getTable()); +188 return new RecordReader<ImmutableBytesWritable, Result>() { 189 190 @Override -191 public Result getCurrentValue() throws IOException, InterruptedException { -192 return trr.getCurrentValue(); -193 } -194 -195 @Override -196 public float getProgress() throws IOException, InterruptedException { -197 return trr.getProgress(); -198 } -199 -200 @Override -201 public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, -202 InterruptedException { -203 trr.initialize(inputsplit, context); +191 public void close() throws IOException { +192 trr.close(); +193 closeTable(); +194 } +195 +196 @Override +197 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { +198 return trr.getCurrentKey(); +199 } +200 +201 @Override +202 public Result getCurrentValue() throws IOException, InterruptedException { +203 return trr.getCurrentValue(); 204 } 205 206 @Override -207 public boolean nextKeyValue() throws IOException, InterruptedException { -208 return trr.nextKeyValue(); +207 public float getProgress() throws IOException, InterruptedException { +208 return trr.getProgress(); 209 } -210 }; -211 } -212 -213 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { -214 return getRegionLocator().getStartEndKeys(); -215 } +210 +211 @Override +212 public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, +213 InterruptedException { +214 trr.initialize(inputsplit, context); +215 } 216 -217 /** -218 * Calculates the splits that will serve as input for the map tasks. The -219 * number of splits matches the number of regions in a table. -220 * -221 * @param context The current job context. -222 * @return The list of input splits. -223 * @throws IOException When creating the list of splits fails. -224 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( -225 * org.apache.hadoop.mapreduce.JobContext) -226 */ -227 @Override -228 public List<InputSplit> getSplits(JobContext context) throws IOException { -229 boolean closeOnFinish = false; -230 -231 // Just in case a subclass is relying on JobConfigurable magic. -232 if (table == null) { -233 initialize(context); -234 closeOnFinish = true; -235 } -236 -237 // null check in case our child overrides getTable to not throw. -238 try { -239 if (getTable() == null) { -240 // initialize() must not have been implemented in the subclass. -241 throw new IOException(INITIALIZATION_ERROR); -242 } -243 } catch (IllegalStateException exception) { -244 throw new IOException(INITIALIZATION_ERROR, exception); -245 } -246 +217 @Override +218 public boolean nextKeyValue() throws IOException, InterruptedException { +219 return trr.nextKeyValue(); +220 } +221 }; +222 } +223 +224 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { +225 return getRegionLocator().getStartEndKeys(); +226 } +227 +228 /** +229 * Calculates the splits that will serve as input for the map tasks. +230 * @param context The current job context. +231 * @return The list of input splits. +232 * @throws IOException When creating the list of splits fails. +233 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( +234 * org.apache.hadoop.mapreduce.JobContext) +235 */ +236 @Override +237 public List<InputSplit> getSplits(JobContext context) throws IOException { +238 boolean closeOnFinish = false; +239 +240 // Just in case a subclass is relying on JobConfigurable magic. +241 if (table == null) { +242 initialize(context); +243 closeOnFinish = true; +244 } +245 +246 // null check in case our child overrides getTable to not throw. 247 try { -248 RegionSizeCalculator sizeCalculator = -249 new RegionSizeCalculator(getRegionLocator(), getAdmin()); -250 -251 TableName tableName = getTable().getName(); -252 -253 Pair<byte[][], byte[][]> keys = getStartEndKeys(); -254 if (keys == null || keys.getFirst() == null || -255 keys.getFirst().length == 0) { -256 HRegionLocation regLoc = -257 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); -258 if (null == regLoc) { -259 throw new IOException("Expecting at least one region."); -260 } -261 List<InputSplit> splits = new ArrayList<>(1); -262 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); -263 TableSplit split = new TableSplit(tableName, scan, -264 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc -265 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); -266 splits.add(split); -267 return splits; +248 if (getTable() == null) { +249 // initialize() must not have been implemented in the subclass. +250 throw new IOException(INITIALIZATION_ERROR); +251 } +252 } catch (IllegalStateException exception) { +253 throw new IOException(INITIALIZATION_ERROR, exception); +254 } +255 +256 try { +257 List<InputSplit> splits = oneInputSplitPerRegion(); +258 +259 // set same number of mappers for each region +260 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { +261 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); +262 List<InputSplit> res = new ArrayList<>(); +263 for (int i = 0; i < splits.size(); i++) { +264 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); +265 res.addAll(tmp); +266 } +267 return res; 268 } -269 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); -270 for (int i = 0; i < keys.getFirst().length; i++) { -271 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -272 continue; -273 } -274 -275 byte[] startRow = scan.getStartRow(); -276 byte[] stopRow = scan.getStopRow(); -277 // determine if the given start an stop key fall into the region -278 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -279 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -280 (stopRow.length == 0 || -281 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { -282 byte[] splitStart = startRow.length == 0 || -283 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -284 keys.getFirst()[i] : startRow; -285 byte[] splitStop = (stopRow.length == 0 || -286 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -287 keys.getSecond()[i].length > 0 ? -288 keys.getSecond()[i] : stopRow; -289 -290 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); -291 // The below InetSocketAddress creation does a name resolution. -292 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); -293 if (isa.isUnresolved()) { -294 LOG.warn("Failed resolve " + isa); -295 } -296 InetAddress regionAddress = isa.getAddress(); -297 String regionLocation; -298 regionLocation = reverseDNS(regionAddress); -299 -300 byte[] regionName = location.getRegionInfo().getRegionName(); -301 String encodedRegionName = location.getRegionInfo().getEncodedName(); -302 long regionSize = sizeCalculator.getRegionSize(regionName); -303 TableSplit split = new TableSplit(tableName, scan, -304 splitStart, splitStop, regionLocation, encodedRegionName, regionSize); -305 splits.add(split); -306 if (LOG.isDebugEnabled()) { -307 LOG.debug("getSplits: split -> " + i + " -> " + split); -308 } -309 } -310 } -311 //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. -312 boolean enableAutoBalance = context.getConfiguration() -313 .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false); -314 if (enableAutoBalance) { -315 long totalRegionSize=0; -316 for (int i = 0; i < splits.size(); i++){ -317 TableSplit ts = (TableSplit)splits.get(i); -318 totalRegionSize += ts.getLength(); -319 } -320 long averageRegionSize = totalRegionSize / splits.size(); -321 // the averageRegionSize must be positive. -322 if (averageRegionSize <= 0) { -323 LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + -324 "set it to 1."); -325 averageRegionSize = 1; -326 } -327 return calculateRebalancedSplits(splits, context, averageRegionSize); -328 } else { -329 return splits; -330 } -331 } finally { -332 if (closeOnFinish) { -333 closeTable(); -334 } -335 } -336 } -337 -338 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { -339 String hostName = this.reverseDNSCacheMap.get(ipAddress); -340 if (hostName == null) { -341 String ipAddressString = null; -342 try { -343 ipAddressString = DNS.reverseDns(ipAddress, null); -344 } catch (Exception e) { -345 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the -346 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving -347 // reverse DNS using jndi doesn't work well with ipv6 addresses. -348 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); -349 } -350 if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); -351 hostName = Strings.domainNamePointerToHostName(ipAddressString); -352 this.reverseDNSCacheMap.put(ipAddress, hostName); -353 } -354 return hostName; -355 } -356 -357 /** -358 * Calculates the number of MapReduce input splits for the map tasks. The number of -359 * MapReduce input splits depends on the average region size and the "data skew ratio" user set in -360 * configuration. -361 * -362 * @param list The list of input splits before balance. -363 * @param context The current job context. -364 * @param average The average size of all regions . -365 * @return The list of input splits. -366 * @throws IOException When creating the list of splits fails. -367 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( -368 * org.apache.hadoop.mapreduce.JobContext) -369 */ -370 private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context, -371 long average) throws IOException { -372 List<InputSplit> resultList = new ArrayList<>(); -373 Configuration conf = context.getConfiguration(); -374 //The default data skew ratio is 3 -375 long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); -376 //It determines which mode to use: text key mode or binary key mode. The default is text mode. -377 boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); -378 long dataSkewThreshold = dataSkewRatio * average; -379 int count = 0; -380 while (count < list.size()) { -381 TableSplit ts = (TableSplit)list.get(count); -382 TableName tableName = ts.getTable(); -383 String regionLocation = ts.getRegionLocation(); -384 String encodedRegionName = ts.getEncodedRegionName(); -385 long regionSize = ts.getLength(); -386 if (regionSize >= dataSkewThreshold) { -387 // if the current region size is large than the data skew threshold, -388 // split the region into two MapReduce input splits. -389 byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); -390 if (Arrays.equals(ts.getEndRow(), splitKey)) { -391 // Not splitting since the end key is the same as the split key -392 resultList.add(ts); -393 } else { -394 //Set the size of child TableSplit as 1/2 of the region size. The exact size of the -395 // MapReduce input splits is not far off. -396 TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, -397 regionLocation, regionSize / 2); -398 TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, -399 regionSize - regionSize / 2); -400 resultList.add(t1); -401 resultList.add(t2); -402 } -403 count++; -404 } else if (regionSize >= average) { -405 // if the region size between average size and data skew threshold size, -406 // make this region as one MapReduce input split. -407 resultList.add(ts); -408 count++; -409 } else { -410 // if the total size of several small continuous regions less than the average region size, -411 // combine them into one MapReduce input split. -412 long totalSize = regionSize; -413 byte[] splitStartKey = ts.getStartRow(); -414 byte[] splitEndKey = ts.getEndRow(); -415 count++; -416 for (; count < list.size(); count++) { -417 TableSplit nextRegion = (TableSplit)list.get(count); -418 long nextRegionSize = nextRegion.getLength(); -419 if (totalSize + nextRegionSize <= dataSkewThreshold) { -420 totalSize = totalSize + nextRegionSize; -421 splitEndKey = nextRegion.getEndRow(); -422 } else { -423 break; -424 } -425 } -426 TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, -427 regionLocation, encodedRegionName, totalSize); -428 resultList.add(t); -429 } -430 } -431 return resultList; -432 } -433 -434 /** -435 * select a split point in the region. The selection of the split point is based on an uniform -436 * distribution assumption for the keys in a region. -437 * Here are some examples: -438 * -439 * <table> -440 * <tr> -441 * <th>start key</th> -442 * <th>end key</th> -443 * <th>is text</th> -444 * <th>split point</th> -445 * </tr> -446 * <tr> -447 * <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td> -448 * <td>'a', 'a', 'a', 'f', 'f', 'f'</td> -449 * <td>true</td> -450 * <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td> -451 * </tr> -452 * <tr> -453 * <td>'1', '1', '1', '0', '0', '0'</td> -454 * <td>'1', '1', '2', '5', '7', '9', '0'</td> -455 * <td>true</td> -456 * <td>'1', '1', '1', -78, -77, -76, -104</td> -457 * </tr> -458 * <tr> -459 * <td>'1', '1', '1', '0'</td> -460 * <td>'1', '1', '2', '0'</td> -461 * <td>true</td> -462 * <td>'1', '1', '1', -80</td> -463 * </tr> -464 * <tr> -465 * <td>13, -19, 126, 127</td> -466 * <td>13, -19, 127, 0</td> -467 * <td>false</td> -468 * <td>13, -19, 126, -65</td> -469 * </tr> -470 * </table> -471 * -472 * Set this function as "public static", make it easier for test. -473 * -474 * @param start Start key of the region -475 * @param end End key of the region -476 * @param isText It determines to use text key mode or binary key mode -477 * @return The split point in the region. -478 */ -479 @InterfaceAudience.Private -480 public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { -481 byte upperLimitByte; -482 byte lowerLimitByte; -483 //Use text mode or binary mode. -484 if (isText) { -485 //The range of text char set in ASCII is [32,126], the lower limit is space and the upper -486 // limit is '~'. -487 upperLimitByte = '~'; -488 lowerLimitByte = ' '; -489 } else { -490 upperLimitByte = -1; -491 lowerLimitByte = 0; -492 } -493 // For special case -494 // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" -495 // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" -496 if (start.length == 0 && end.length == 0){ -497 return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; -498 } -499 if (start.length == 0 && end.length != 0){ -500 return new byte[]{ end[0] }; -501 } -502 if (start.length != 0 && end.length == 0){ -503 byte[] result =new byte[start.length]; -504 result[0]=start[0]; -505 for (int k = 1; k < start.length; k++){ -506 result[k] = upperLimitByte; -507 } -508 return result; -509 } -510 return Bytes.split(start, end, false, 1)[1]; -511 } -512 -513 /** -514 * Test if the given region is to be included in the InputSplit while splitting -515 * the regions of a table. -516 * <p> -517 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, -518 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> -519 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, -520 * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. -521 * <br> -522 * <br> -523 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. +269 +270 //The default value of "hbase.mapreduce.input.autobalance" is false. +271 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false) { +272 long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE, 8*1073741824); +273 return calculateAutoBalancedSplits(splits, maxAveRegionSize); +274 } +275 +276 // return one mapper per region +277 return splits; +278 } finally { +279 if (closeOnFinish) { +280 closeTable(); +281 } +282 } +283 } +284 +285 /** +286 * Create one InputSplit per region +287 * +288 * @return The list of InputSplit for all the regions +289 * @throws IOException +290 */ +291 private List<InputSplit> oneInputSplitPerRegion() throws IOException { +292 RegionSizeCalculator sizeCalculator = +293 new RegionSizeCalculator(getRegionLocator(), getAdmin()); +294 +295 TableName tableName = getTable().getName(); +296 +297 Pair<byte[][], byte[][]> keys = getStartEndKeys(); +298 if (keys == null || keys.getFirst() == null || +299 keys.getFirst().length == 0) { +300 HRegionLocation regLoc = +301 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); +302 if (null == regLoc) { +303 throw new IOException("Expecting at least one region."); +304 } +305 List<InputSplit> splits = new ArrayList<>(1); +306 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); +307 TableSplit split = new TableSplit(tableName, scan, +308 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc +309 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); +310 splits.add(split); +311 return splits; +312 } +313 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); +314 for (int i = 0; i < keys.getFirst().length; i++) { +315 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +316 continue; +317 } +318 +319 byte[] startRow = scan.getStartRow(); +320 byte[] stopRow = scan.getStopRow(); +321 // determine if the given start an stop key fall into the region +322 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || +323 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && +324 (stopRow.length == 0 || +325 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { +326 byte[] splitStart = startRow.length == 0 || +327 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? +328 keys.getFirst()[i] : startRow; +329 byte[] splitStop = (stopRow.length == 0 || +330 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && +331 keys.getSecond()[i].length > 0 ? +332 keys.getSecond()[i] : stopRow; +333 +334 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); +335 // The below InetSocketAddress creation does a name resolution. +336 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); +337 if (isa.isUnresolved()) { +338 LOG.warn("Failed resolve " + isa); +339 } +340 InetAddress regionAddress = isa.getAddress(); +341 String regionLocation; +342 regionLocation = reverseDNS(regionAddress); +343 +344 byte[] regionName = location.getRegionInfo().getRegionName(); +345 String encodedRegionName = location.getRegionInfo().getEncodedName(); +346 long regionSize = sizeCalculator.getRegionSize(regionName); +347 TableSplit split = new TableSplit(tableName, scan, +348 splitStart, splitStop, regionLocation, encodedRegionName, regionSize); +349 splits.add(split); +350 if (LOG.isDebugEnabled()) { +351 LOG.debug("getSplits: split -> " + i + " -> " + split); +352 } +353 } +354 } +355 return splits; +356 } +357 +358 /** +359 * Create n splits for one InputSplit, For now only support uniform distribution +360 * @param split A TableSplit corresponding to a range of rowkeys +361 * @param n Number of ranges after splitting. Pass 1 means no split for the range +362 * Pass 2 if you want to split the range in two; +363 * @return A list of TableSplit, the size of the list is n +364 * @throws IllegalArgumentIOException +365 */ +366 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) +367 throws IllegalArgumentIOException { +368 if (split == null || !(split instanceof TableSplit)) { +369 throw new IllegalArgumentIOException( +370 "InputSplit for CreateNSplitsPerRegion can not be null + " +371 + "and should be instance of TableSplit"); +372 } +373 //if n < 1, then still continue using n = 1 +374 n = n < 1 ? 1 : n; +375 List<InputSplit> res = new ArrayList<>(n); +376 if (n == 1) { +377 res.add(split); +378 return res; +379 } +380 +381 // Collect Region related information +382 TableSplit ts = (TableSplit) split; +383 TableName tableName = ts.getTable(); +384 String regionLocation = ts.getRegionLocation(); +385 String encodedRegionName = ts.getEncodedRegionName(); +386 long regionSize = ts.getLength(); +387 byte[] startRow = ts.getStartRow(); +388 byte[] endRow = ts.getEndRow(); +389 +390 // For special case: startRow or endRow is empty +391 if (startRow.length == 0 && endRow.length == 0){ +392 startRow = new byte[1]; +393 endRow = new byte[1]; +394 startRow[0] = 0; +395 endRow[0] = -1; +396 } +397 if (startRow.length == 0 && endRow.length != 0){ +398 startRow = new byte[1]; +399 startRow[0] = 0; +400 } +401 if (startRow.length != 0 && endRow.length == 0){ +402 endRow =new byte[startRow.length]; +403 for (int k = 0; k < startRow.length; k++){ +404 endRow[k] = -1; +405 } +406 } +407 +408 // Split Region into n chunks evenly +409 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); +410 for (int i = 0; i < splitKeys.length - 1; i++) { +411 //notice that the regionSize parameter may be not very accurate +412 TableSplit tsplit = +413 new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, +414 encodedRegionName, regionSize / n); +415 res.add(tsplit); +416 } +417 return res; +418 } +419 /** +420 * Calculates the number of MapReduce input splits for the map tasks. The number of +421 * MapReduce input splits depends on the average region size. +422 * Make it 'public' for testing +423 * +424 * @param splits The list of input splits before balance. +425 * @param maxAverageRegionSize max Average region size for one mapper +426 * @return The list of input splits. +427 * @throws IOException When creating the list of splits fails. +428 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( +429 *org.apache.hadoop.mapreduce.JobContext) +430 */ +431 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize) +432 throws IOException { +433 if (splits.size() == 0) { +434 return splits; +435 } +436 List<InputSplit> resultList = new ArrayList<>(); +437 long totalRegionSize = 0; +438 for (int i = 0; i < splits.size(); i++) { +439 TableSplit ts = (TableSplit) splits.get(i); +440 totalRegionSize += ts.getLength(); +441 } +442 long averageRegionSize = totalRegionSize / splits.size(); +443 // totalRegionSize might be overflow, and the averageRegionSize must be positive. +444 if (averageRegionSize <= 0) { +445 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + +446 "set it to Long.MAX_VALUE " + splits.size()); +447 averageRegionSize = Long.MAX_VALUE / splits.size(); +448 } +449 //if averageRegionSize is too big, change it to default as 1 GB, +450 if (averageRegionSize > maxAverageRegionSize) { +451 averageRegionSize = maxAverageRegionSize; +452 } +453 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region +454 // set default as 16M = (default hdfs block size) / 4; +455 if (averageRegionSize < 16 * 1048576) { +456 return splits; +457 } +458 for (int i = 0; i < splits.size(); i++) { +459 TableSplit ts = (TableSplit) splits.get(i); +460 TableName tableName = ts.getTable(); +461 String regionLocation = ts.getRegionLocation(); +462 String encodedRegionName = ts.getEncodedRegionName(); +463 long regionSize = ts.getLength(); +464 +465 if (regionSize >= averageRegionSize) { +466 // make this region as multiple MapReduce input split. +467 int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); +468 List<InputSplit> temp = createNInputSplitsUniform(ts, n); +469 resultList.addAll(temp); +470 } else { +471 // if the total size of several small continuous regions less than the average region size, +472 // combine them into one MapReduce input split. +473 long totalSize = regionSize; +474 byte[] splitStartKey = ts.getStartRow(); +475 byte[] splitEndKey = ts.getEndRow(); +476 int j = i + 1; +477 while (j < splits.size()) { +478 TableSplit nextRegion = (TableSplit) splits.get(j); +479 long nextRegionSize = nextRegion.getLength(); +480 if (totalSize + nextRegionSize <= averageRegionSize) { +481 totalSize = totalSize + nextRegionSize; +482 splitEndKey = nextRegion.getEndRow(); +483 j++; +484 } else { +485 break; +486 } +487 } +488 i = j - 1; +489 TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, +490 encodedRegionName, totalSize); +491 resultList.add(t); +492 } +493 } +494 return resultList; +495 } +496 +497 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { +498 String hostName = this.reverseDNSCacheMap.get(ipAddress); +499 if (hostName == null) { +500 String ipAddressString = null; +501 try { +5