Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8CB74200D4A for ; Tue, 28 Nov 2017 23:50:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8AE43160C07; Tue, 28 Nov 2017 22:50:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B3B02160BE7 for ; Tue, 28 Nov 2017 23:50:57 +0100 (CET) Received: (qmail 34266 invoked by uid 500); 28 Nov 2017 22:50:56 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 34257 invoked by uid 99); 28 Nov 2017 22:50:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Nov 2017 22:50:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A59BDE02F5; Tue, 28 Nov 2017 22:50:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Message-Id: <36f81126b1b74f219c504d35863e0692@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18090 Improve TableSnapshotInputFormat to allow more multiple mappers per region Date: Tue, 28 Nov 2017 22:50:56 +0000 (UTC) archived-at: Tue, 28 Nov 2017 22:50:59 -0000 Repository: hbase Updated Branches: refs/heads/branch-1.3 dca65353d -> b1912790f HBASE-18090 Improve TableSnapshotInputFormat to allow more multiple mappers per region Signed-off-by: Michael Stack Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b1912790 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b1912790 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b1912790 Branch: refs/heads/branch-1.3 Commit: b1912790f9c057e51fd7e6dde924bfd0f6e784cc Parents: dca6535 Author: libisthanks Authored: Thu Nov 9 10:53:22 2017 +0800 Committer: Michael Stack Committed: Tue Nov 28 14:50:40 2017 -0800 ---------------------------------------------------------------------- ...IntegrationTestTableSnapshotInputFormat.java | 4 +- .../hbase/client/ClientSideRegionScanner.java | 2 + .../hadoop/hbase/mapred/TableMapReduceUtil.java | 38 +++++++ .../hbase/mapred/TableSnapshotInputFormat.java | 18 +++ .../hbase/mapreduce/TableMapReduceUtil.java | 40 +++++++ .../mapreduce/TableSnapshotInputFormat.java | 24 +++- .../mapreduce/TableSnapshotInputFormatImpl.java | 112 +++++++++++++++++-- .../hadoop/hbase/util/RegionSplitter.java | 70 ++++++++++++ .../mapred/TestTableSnapshotInputFormat.java | 41 ++++--- .../TableSnapshotInputFormatTestBase.java | 23 ++-- .../mapreduce/TestTableSnapshotInputFormat.java | 35 ++++-- .../hadoop/hbase/util/TestRegionSplitter.java | 20 ++++ 12 files changed, 380 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java index 1a152e8..2df1c4b 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java @@ -151,7 +151,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions; org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util, - tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, + tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1, expectedNumSplits, false); } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) { /* @@ -165,7 +165,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase int expectedNumSplits = numRegions; org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util, - tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, + tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1, expectedNumSplits, false); } else { throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +"."); http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index dde2f10..ef89c32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -56,6 +56,8 @@ public class ClientSideRegionScanner extends AbstractClientScanner { // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + htd.setReadOnly(true); + // open region from the snapshot directory this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index d5f225f..476c1a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.mapreduce.ResultSerialization; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.Text; @@ -194,6 +195,43 @@ public class TableMapReduceUtil { } /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param jobConf The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restore directory can be deleted. + * @param splitAlgo algorithm to split + * @param numSplitsPerRegion how many input splits to generate per one region + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapJob(String snapshotName, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf jobConf, + boolean addDependencyJars, Path tmpRestoreDir, + RegionSplitter.SplitAlgorithm splitAlgo, + int numSplitsPerRegion) + throws IOException { + TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo, + numSplitsPerRegion); + initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf, + addDependencyJars, TableSnapshotInputFormat.class); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf); + } + + + /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java index a5c62b2..b9e0a03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -27,11 +27,13 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; import java.io.DataInput; import java.io.DataOutput; @@ -165,4 +167,20 @@ public class TableSnapshotInputFormat implements InputFormat mapper, + Class outputKeyClass, + Class outputValueClass, Job job, + boolean addDependencyJars, Path tmpRestoreDir, + RegionSplitter.SplitAlgorithm splitAlgo, + int numSplitsPerRegion) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo, + numSplitsPerRegion); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + resetCacheConfig(job.getConfiguration()); + } + + + + /** * Use this before submitting a Multi TableMap job. It will appropriately set * up the job. * http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index c40396f..dce311d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -66,8 +67,10 @@ import java.util.List; * } * *

- * Internally, this input format restores the snapshot into the given tmp directory. Similar to - * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading + * Internally, this input format restores the snapshot into the given tmp directory. By default, + * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you + * can run N mapper tasks per every region, in which case the region key range will be split to + * N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading * from each RecordReader. An internal RegionScanner is used to execute the * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. *

@@ -204,4 +207,21 @@ public class TableSnapshotInputFormat extends InputFormat) + Class.forName(splitAlgoClassName)).newInstance(); + } catch (ClassNotFoundException e) { + throw new IOException("SplitAlgo class " + splitAlgoClassName + + " is not found", e); + } catch (InstantiationException e) { + throw new IOException("SplitAlgo class " + splitAlgoClassName + + " is not instantiable", e); + } catch (IllegalAccessException e) { + throw new IOException("SplitAlgo class " + splitAlgoClassName + + " is not instantiable", e); + } } public static List getRegionInfosFromManifest(SnapshotManifest manifest) { @@ -305,6 +340,12 @@ public class TableSnapshotInputFormatImpl { public static List getSplits(Scan scan, SnapshotManifest manifest, List regionManifests, Path restoreDir, Configuration conf) throws IOException { + return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1); + } + + public static List getSplits(Scan scan, SnapshotManifest manifest, + List regionManifests, Path restoreDir, + Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException { // load table descriptor HTableDescriptor htd = manifest.getTableDescriptor(); @@ -317,16 +358,36 @@ public class TableSnapshotInputFormatImpl { continue; } - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), - hri.getEndKey())) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + if (numSplits > 1) { + byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); + for (int i = 0; i < sp.length - 1; i++) { + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], + sp[i + 1])) { + // compute HDFS locations from snapshot files (which will get the locations for + // referred hfiles) + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + Scan boundedScan = new Scan(scan); + boundedScan.setStartRow(sp[i]); + boundedScan.setStopRow(sp[i + 1]); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + } + } + } else { + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), + hri.getEndKey())) { + // compute HDFS locations from snapshot files (which will get the locations for + // referred hfiles) + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + } } } @@ -395,6 +456,35 @@ public class TableSnapshotInputFormatImpl { */ public static void setInput(Configuration conf, String snapshotName, Path restoreDir) throws IOException { + setInput(conf, snapshotName, restoreDir, null, 1); + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param conf the job to configure + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @param numSplitsPerRegion how many input splits to generate per one region + * @param splitAlgo SplitAlgorithm to be used when generating InputSplits + * @throws IOException if an error occurs + */ + public static void setInput(Configuration conf, String snapshotName, Path restoreDir, + RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) + throws IOException { + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + if (numSplitsPerRegion < 1) { + throw new IllegalArgumentException("numSplits must be >= 1, " + + "illegal numSplits : " + numSplitsPerRegion); + } + if (splitAlgo == null && numSplitsPerRegion > 1) { + throw new IllegalArgumentException("Split algo can't be null when numSplits > 1"); + } + if (splitAlgo != null) { + conf.set(SPLIT_ALGO, splitAlgo.getClass().getName()); + } + conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion); conf.set(SNAPSHOT_NAME_KEY, snapshotName); Path rootDir = FSUtils.getRootDir(conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index ea704f8..2ba1673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -179,6 +180,17 @@ public class RegionSplitter { byte[][] split(int numRegions); /** + * Some MapReduce jobs may want to run multiple mappers per region, + * this is intended for such usecase. + * + * @param start first row (inclusive) + * @param end last row (exclusive) + * @param numSplits number of splits to generate + * @param inclusive whether start and end are returned as split points + */ + byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive); + + /** * In HBase, the first row is represented by an empty byte array. This might * cause problems with your split algorithm or row printing. All your APIs * will be passed firstRow() instead of empty array. @@ -921,6 +933,39 @@ public class RegionSplitter { return convertToBytes(splits); } + @Override + public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) { + BigInteger s = convertToBigInteger(start); + BigInteger e = convertToBigInteger(end); + + Preconditions.checkArgument(e.compareTo(s) > 0, + "last row (%s) is configured less than first row (%s)", rowToStr(end), + end); + // +1 to range because the last row is inclusive + BigInteger range = e.subtract(s).add(BigInteger.ONE); + Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0, + "split granularity (%s) is greater than the range (%s)", numSplits, range); + + BigInteger[] splits = new BigInteger[numSplits - 1]; + BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits)); + for (int i = 1; i < numSplits; i++) { + // NOTE: this means the last region gets all the slop. + // This is not a big deal if we're assuming n << MAXHEX + splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger + .valueOf(i))); + } + + if (inclusive) { + BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1]; + inclusiveSplitPoints[0] = convertToBigInteger(start); + inclusiveSplitPoints[numSplits] = convertToBigInteger(end); + System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length); + return convertToBytes(inclusiveSplitPoints); + } else { + return convertToBytes(splits); + } + } + public byte[] firstRow() { return convertToByte(firstRowInt); } @@ -1063,6 +1108,31 @@ public class RegionSplitter { return Arrays.copyOfRange(splits, 1, splits.length - 1); } + public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) { + if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) { + start = firstRowBytes; + } + if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) { + end = lastRowBytes; + } + Preconditions.checkArgument( + Bytes.compareTo(end, start) > 0, + "last row (%s) is configured less than first row (%s)", + Bytes.toStringBinary(end), + Bytes.toStringBinary(start)); + + byte[][] splits = Bytes.split(start, end, true, + numSplits - 1); + Preconditions.checkState(splits != null, + "Could not calculate input splits with given user input: " + this); + if (inclusive) { + return splits; + } else { + // remove endpoints, which are included in the splits list + return Arrays.copyOfRange(splits, 1, splits.length - 1); + } + } + @Override public byte[] firstRow() { return firstRowBytes; http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java index 60f19a2..ed98eb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; @@ -131,20 +132,20 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Test @Override public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10); } @Test @Override public void testWithMapReduceMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false); + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false); } @Test @Override // run the MR job while HBase is offline public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true); + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true); } @Override @@ -158,7 +159,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { setupCluster(); TableName tableName = TableName.valueOf("testWithMockedMapReduce"); try { @@ -168,9 +169,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa JobConf job = new JobConf(util.getConfiguration()); Path tmpTableDir = util.getRandomDir(); - TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, - COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, false, tmpTableDir); + if (numSplitsPerRegion > 1) { + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(), + numSplitsPerRegion); + } else { + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + } // mapred doesn't support start and end keys? o.O verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); @@ -219,16 +227,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, - String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, - numRegions, expectedNumSplits, shutdownCluster); + numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); } // this is also called by the IntegrationTestTableSnapshotInputFormat public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, - int expectedNumSplits, boolean shutdownCluster) throws Exception { + int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { //create the table and snapshot createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); @@ -245,9 +253,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, TestTableSnapshotInputFormat.class); - TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, - TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, jobConf, true, tableDir); + if(numSplitsPerRegion > 1) { + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(), + numSplitsPerRegion); + } else { + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, jobConf, true, tableDir); + } jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); jobConf.setNumReduceTasks(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 3df4a8f..9402612 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -76,10 +76,10 @@ public abstract class TableSnapshotInputFormatTestBase { } protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits) throws Exception; + int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception; protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, - String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception; protected abstract byte[] getStartRow(); @@ -88,28 +88,33 @@ public abstract class TableSnapshotInputFormatTestBase { @Test public void testWithMockedMapReduceSingleRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1); } @Test public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8); } @Test public void testWithMapReduceSingleRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); + testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, 1, false); } @Test public void testWithMapReduceMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 8, false); + } + + @Test + public void testWithMapReduceMultipleMappersPerRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); } @Test // run the MR job while HBase is offline public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { - testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 8, true); } // Test that snapshot restore does not create back references in the HBase root dir. @@ -157,13 +162,13 @@ public abstract class TableSnapshotInputFormatTestBase { String snapshotName, Path tmpTableDir) throws Exception; protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { setupCluster(); try { Path tableDir = util.getRandomDir(); TableName tableName = TableName.valueOf("testWithMapReduce"); testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions, - expectedNumSplits, shutdownCluster); + numSplitsPerRegion, expectedNumSplits, shutdownCluster); } finally { tearDownCluster(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 3531dd7..ed24495 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -195,7 +196,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { setupCluster(); TableName tableName = TableName.valueOf("testWithMockedMapReduce"); try { @@ -206,9 +207,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa Path tmpTableDir = util.getRandomDir(); Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, false, tmpTableDir); + if (numSplitsPerRegion > 1) { + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(), + numSplitsPerRegion); + } else { + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + } verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); @@ -321,16 +329,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, - String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, - numRegions, expectedNumSplits, shutdownCluster); + numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); } // this is also called by the IntegrationTestTableSnapshotInputFormat public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, - int expectedNumSplits, boolean shutdownCluster) throws Exception { + int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { //create the table and snapshot createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); @@ -348,9 +356,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), TestTableSnapshotInputFormat.class); - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, true, tableDir); + if (numSplitsPerRegion > 1) { + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(), + numSplitsPerRegion); + } else { + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, true, tableDir); + } job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); job.setNumReduceTasks(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java index e343588..e8600a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java @@ -162,6 +162,16 @@ public class TestRegionSplitter { // Halfway between df... and ff... should be ef.... splitPoint = splitter.split("dfffffff".getBytes(), lastRow); assertArrayEquals(splitPoint,"efffffff".getBytes()); + + // Check splitting region with multiple mappers per region + byte[][] splits = splitter.split("00000000".getBytes(), "30000000".getBytes(), 3, false); + assertEquals(2, splits.length); + assertArrayEquals(splits[0], "10000000".getBytes()); + assertArrayEquals(splits[1], "20000000".getBytes()); + + splits = splitter.split("00000000".getBytes(), "20000000".getBytes(), 2, true); + assertEquals(3, splits.length); + assertArrayEquals(splits[1], "10000000".getBytes()); } /** @@ -210,6 +220,16 @@ public class TestRegionSplitter { splitPoint = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'b'}); assertArrayEquals(splitPoint, new byte[] {'a', 'a', 'a', (byte)0x80 }); + + // Check splitting region with multiple mappers per region + byte[][] splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'd'}, 3, false); + assertEquals(2, splits.length); + assertArrayEquals(splits[0], new byte[]{'a', 'a', 'b'}); + assertArrayEquals(splits[1], new byte[]{'a', 'a', 'c'}); + + splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'e'}, 2, true); + assertEquals(3, splits.length); + assertArrayEquals(splits[1], new byte[] { 'a', 'a', 'c'}); } @Test