From commits-return-77695-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Thu Sep 6 16:53:56 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0DD7D180789 for ; Thu, 6 Sep 2018 16:53:53 +0200 (CEST) Received: (qmail 37312 invoked by uid 500); 6 Sep 2018 14:53:52 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 36927 invoked by uid 99); 6 Sep 2018 14:53:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Sep 2018 14:53:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77E74E1182; Thu, 6 Sep 2018 14:53:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 06 Sep 2018 14:54:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/31] hbase-site git commit: Published site at f8b12805bbff18fc94e2ac6894f92276b1453350. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/06efc31c/testdevapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.ScanMapper.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.ScanMapper.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.ScanMapper.html index 9716afd..3253dec 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.ScanMapper.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.ScanMapper.html @@ -31,319 +31,297 @@ 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; -026import java.util.*; -027 -028import org.apache.hadoop.conf.Configuration; -029import org.apache.hadoop.fs.Path; -030import org.apache.hadoop.hbase.HBaseTestingUtility; -031import org.apache.hadoop.hbase.TableName; -032import org.apache.hadoop.hbase.client.Result; -033import org.apache.hadoop.hbase.client.Scan; -034import org.apache.hadoop.hbase.client.Table; -035import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -036import org.apache.hadoop.hbase.util.Bytes; -037import org.apache.hadoop.io.NullWritable; -038import org.apache.hadoop.mapred.JobConf; -039import org.apache.hadoop.mapreduce.InputSplit; -040import org.apache.hadoop.mapreduce.Job; -041import org.apache.hadoop.mapreduce.Reducer; -042import org.apache.hadoop.mapreduce.TaskCounter; -043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -044import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -045import org.junit.AfterClass; -046import org.junit.Assert; -047import org.junit.BeforeClass; -048import org.slf4j.Logger; -049import org.slf4j.LoggerFactory; -050 -051 -052/** -053 * <p> -054 * Tests various scan start and stop row scenarios. This is set in a scan and -055 * tested in a MapReduce job to see if that is handed over and done properly -056 * too. -057 * </p> -058 * <p> -059 * This test is broken into two parts in order to side-step the test timeout -060 * period of 900, as documented in HBASE-8326. -061 * </p> -062 */ -063public abstract class TestTableInputFormatScanBase { -064 -065 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); -066 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); -067 -068 static final TableName TABLE_NAME = TableName.valueOf("scantest"); -069 static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; -070 static final String KEY_STARTROW = "startRow"; -071 static final String KEY_LASTROW = "stpRow"; -072 -073 private static Table table = null; -074 -075 @BeforeClass -076 public static void setUpBeforeClass() throws Exception { -077 // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. -078 // this turns it off for this test. TODO: Figure out why scr breaks recovery. -079 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); -080 -081 // switch TIF to log at DEBUG level -082 TEST_UTIL.enableDebug(TableInputFormat.class); -083 TEST_UTIL.enableDebug(TableInputFormatBase.class); -084 // start mini hbase cluster -085 TEST_UTIL.startMiniCluster(3); -086 // create and fill table -087 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); -088 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); -089 } -090 -091 @AfterClass -092 public static void tearDownAfterClass() throws Exception { -093 TEST_UTIL.shutdownMiniCluster(); -094 } -095 -096 /** -097 * Pass the key and value to reduce. -098 */ -099 public static class ScanMapper -100 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { -101 -102 /** -103 * Pass the key and value to reduce. -104 * -105 * @param key The key, here "aaa", "aab" etc. -106 * @param value The value is the same as the key. -107 * @param context The task context. -108 * @throws IOException When reading the rows fails. -109 */ -110 @Override -111 public void map(ImmutableBytesWritable key, Result value, -112 Context context) -113 throws IOException, InterruptedException { -114 if (value.size() != 2) { -115 throw new IOException("There should be two input columns"); -116 } -117 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> -118 cfMap = value.getMap(); -119 -120 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { -121 throw new IOException("Wrong input columns. Missing: '" + -122 Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); -123 } -124 -125 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); -126 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); -127 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + -128 ", value -> (" + val0 + ", " + val1 + ")"); -129 context.write(key, key); -130 } -131 } -132 -133 /** -134 * Checks the last and first key seen against the scanner boundaries. -135 */ -136 public static class ScanReducer -137 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, -138 NullWritable, NullWritable> { -139 -140 private String first = null; -141 private String last = null; -142 -143 protected void reduce(ImmutableBytesWritable key, -144 Iterable<ImmutableBytesWritable> values, Context context) -145 throws IOException ,InterruptedException { -146 int count = 0; -147 for (ImmutableBytesWritable value : values) { -148 String val = Bytes.toStringBinary(value.get()); -149 LOG.info("reduce: key[" + count + "] -> " + -150 Bytes.toStringBinary(key.get()) + ", value -> " + val); -151 if (first == null) first = val; -152 last = val; -153 count++; -154 } -155 } -156 -157 protected void cleanup(Context context) -158 throws IOException, InterruptedException { -159 Configuration c = context.getConfiguration(); -160 String startRow = c.get(KEY_STARTROW); -161 String lastRow = c.get(KEY_LASTROW); -162 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); -163 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); -164 if (startRow != null && startRow.length() > 0) { -165 assertEquals(startRow, first); -166 } -167 if (lastRow != null && lastRow.length() > 0) { -168 assertEquals(lastRow, last); -169 } -170 } -171 -172 } -173 -174 /** -175 * Tests an MR Scan initialized from properties set in the Configuration. -176 * -177 * @throws IOException -178 * @throws ClassNotFoundException -179 * @throws InterruptedException -180 */ -181 protected void testScanFromConfiguration(String start, String stop, String last) -182 throws IOException, InterruptedException, ClassNotFoundException { -183 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + -184 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); -185 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); -186 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); -187 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", " -188 + Bytes.toString(INPUT_FAMILYS[1])); -189 c.set(KEY_STARTROW, start != null ? start : ""); -190 c.set(KEY_LASTROW, last != null ? last : ""); +026import java.util.ArrayList; +027import java.util.List; +028import java.util.Locale; +029import java.util.Map; +030import java.util.NavigableMap; +031import org.apache.hadoop.conf.Configuration; +032import org.apache.hadoop.fs.Path; +033import org.apache.hadoop.hbase.HBaseTestingUtility; +034import org.apache.hadoop.hbase.TableName; +035import org.apache.hadoop.hbase.client.Result; +036import org.apache.hadoop.hbase.client.Scan; +037import org.apache.hadoop.hbase.client.Table; +038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +039import org.apache.hadoop.hbase.util.Bytes; +040import org.apache.hadoop.io.NullWritable; +041import org.apache.hadoop.mapred.JobConf; +042import org.apache.hadoop.mapreduce.InputSplit; +043import org.apache.hadoop.mapreduce.Job; +044import org.apache.hadoop.mapreduce.Reducer; +045import org.apache.hadoop.mapreduce.TaskCounter; +046import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +047import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +048import org.junit.AfterClass; +049import org.junit.Assert; +050import org.junit.BeforeClass; +051import org.slf4j.Logger; +052import org.slf4j.LoggerFactory; +053 +054 +055/** +056 * Tests various scan start and stop row scenarios. This is set in a scan and tested in a MapReduce +057 * job to see if that is handed over and done properly too. +058 */ +059public abstract class TestTableInputFormatScanBase { +060 +061 private static final Logger LOG = LoggerFactory.getLogger(TestTableInputFormatScanBase.class); +062 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); +063 +064 static final TableName TABLE_NAME = TableName.valueOf("scantest"); +065 static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; +066 static final String KEY_STARTROW = "startRow"; +067 static final String KEY_LASTROW = "stpRow"; +068 +069 private static Table table = null; +070 +071 @BeforeClass +072 public static void setUpBeforeClass() throws Exception { +073 // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. +074 // this turns it off for this test. TODO: Figure out why scr breaks recovery. +075 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); +076 +077 // switch TIF to log at DEBUG level +078 TEST_UTIL.enableDebug(TableInputFormat.class); +079 TEST_UTIL.enableDebug(TableInputFormatBase.class); +080 // start mini hbase cluster +081 TEST_UTIL.startMiniCluster(3); +082 // create and fill table +083 table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); +084 TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); +085 } +086 +087 @AfterClass +088 public static void tearDownAfterClass() throws Exception { +089 TEST_UTIL.shutdownMiniCluster(); +090 } +091 +092 /** +093 * Pass the key and value to reduce. +094 */ +095 public static class ScanMapper +096 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { +097 +098 /** +099 * Pass the key and value to reduce. +100 * +101 * @param key The key, here "aaa", "aab" etc. +102 * @param value The value is the same as the key. +103 * @param context The task context. +104 * @throws IOException When reading the rows fails. +105 */ +106 @Override +107 public void map(ImmutableBytesWritable key, Result value, +108 Context context) +109 throws IOException, InterruptedException { +110 if (value.size() != 2) { +111 throw new IOException("There should be two input columns"); +112 } +113 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> +114 cfMap = value.getMap(); +115 +116 if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { +117 throw new IOException("Wrong input columns. Missing: '" + +118 Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); +119 } +120 +121 String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); +122 String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); +123 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + +124 ", value -> (" + val0 + ", " + val1 + ")"); +125 context.write(key, key); +126 } +127 } +128 +129 /** +130 * Checks the last and first key seen against the scanner boundaries. +131 */ +132 public static class ScanReducer +133 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, +134 NullWritable, NullWritable> { +135 +136 private String first = null; +137 private String last = null; +138 +139 protected void reduce(ImmutableBytesWritable key, +140 Iterable<ImmutableBytesWritable> values, Context context) +141 throws IOException ,InterruptedException { +142 int count = 0; +143 for (ImmutableBytesWritable value : values) { +144 String val = Bytes.toStringBinary(value.get()); +145 LOG.info("reduce: key[" + count + "] -> " + +146 Bytes.toStringBinary(key.get()) + ", value -> " + val); +147 if (first == null) first = val; +148 last = val; +149 count++; +150 } +151 } +152 +153 protected void cleanup(Context context) +154 throws IOException, InterruptedException { +155 Configuration c = context.getConfiguration(); +156 String startRow = c.get(KEY_STARTROW); +157 String lastRow = c.get(KEY_LASTROW); +158 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); +159 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); +160 if (startRow != null && startRow.length() > 0) { +161 assertEquals(startRow, first); +162 } +163 if (lastRow != null && lastRow.length() > 0) { +164 assertEquals(lastRow, last); +165 } +166 } +167 +168 } +169 +170 /** +171 * Tests an MR Scan initialized from properties set in the Configuration. +172 */ +173 protected void testScanFromConfiguration(String start, String stop, String last) +174 throws IOException, InterruptedException, ClassNotFoundException { +175 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + +176 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); +177 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); +178 c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); +179 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, +180 Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1])); +181 c.set(KEY_STARTROW, start != null ? start : ""); +182 c.set(KEY_LASTROW, last != null ? last : ""); +183 +184 if (start != null) { +185 c.set(TableInputFormat.SCAN_ROW_START, start); +186 } +187 +188 if (stop != null) { +189 c.set(TableInputFormat.SCAN_ROW_STOP, stop); +190 } 191 -192 if (start != null) { -193 c.set(TableInputFormat.SCAN_ROW_START, start); -194 } -195 -196 if (stop != null) { -197 c.set(TableInputFormat.SCAN_ROW_STOP, stop); -198 } -199 -200 Job job = new Job(c, jobName); -201 job.setMapperClass(ScanMapper.class); -202 job.setReducerClass(ScanReducer.class); -203 job.setMapOutputKeyClass(ImmutableBytesWritable.class); -204 job.setMapOutputValueClass(ImmutableBytesWritable.class); -205 job.setInputFormatClass(TableInputFormat.class); -206 job.setNumReduceTasks(1); -207 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); -208 TableMapReduceUtil.addDependencyJars(job); -209 assertTrue(job.waitForCompletion(true)); -210 } -211 -212 /** -213 * Tests a MR scan using specific start and stop rows. -214 * -215 * @throws IOException -216 * @throws ClassNotFoundException -217 * @throws InterruptedException -218 */ -219 protected void testScan(String start, String stop, String last) -220 throws IOException, InterruptedException, ClassNotFoundException { -221 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + -222 "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); -223 LOG.info("Before map/reduce startup - job " + jobName); -224 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); -225 Scan scan = new Scan(); -226 scan.addFamily(INPUT_FAMILYS[0]); -227 scan.addFamily(INPUT_FAMILYS[1]); -228 if (start != null) { -229 scan.setStartRow(Bytes.toBytes(start)); -230 } -231 c.set(KEY_STARTROW, start != null ? start : ""); -232 if (stop != null) { -233 scan.setStopRow(Bytes.toBytes(stop)); -234 } -235 c.set(KEY_LASTROW, last != null ? last : ""); -236 LOG.info("scan before: " + scan); -237 Job job = new Job(c, jobName); -238 TableMapReduceUtil.initTableMapperJob( -239 TABLE_NAME, scan, ScanMapper.class, -240 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); -241 job.setReducerClass(ScanReducer.class); -242 job.setNumReduceTasks(1); // one to get final "first" and "last" key -243 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); -244 LOG.info("Started " + job.getJobName()); -245 assertTrue(job.waitForCompletion(true)); -246 LOG.info("After map/reduce completion - job " + jobName); -247 } -248 -249 -250 /** -251 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX -252 * This test does not run MR job -253 * -254 * @throws IOException -255 * @throws ClassNotFoundException -256 * @throws InterruptedException -257 */ -258 public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException, -259 InterruptedException, -260 ClassNotFoundException { -261 String jobName = "TestJobForNumOfSplits"; -262 LOG.info("Before map/reduce startup - job " + jobName); -263 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); -264 Scan scan = new Scan(); -265 scan.addFamily(INPUT_FAMILYS[0]); -266 scan.addFamily(INPUT_FAMILYS[1]); -267 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); -268 c.set(KEY_STARTROW, ""); -269 c.set(KEY_LASTROW, ""); -270 Job job = new Job(c, jobName); -271 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, -272 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); -273 TableInputFormat tif = new TableInputFormat(); -274 tif.setConf(job.getConfiguration()); -275 Assert.assertEquals(TABLE_NAME, table.getName()); -276 List<InputSplit> splits = tif.getSplits(job); -277 Assert.assertEquals(expectedNumOfSplits, splits.size()); -278 } -279 -280 /** -281 * Run MR job to check the number of mapper = expectedNumOfSplits -282 * @throws IOException -283 * @throws InterruptedException -284 * @throws ClassNotFoundException -285 */ -286 public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException, -287 InterruptedException, -288 ClassNotFoundException { -289 String jobName = "TestJobForNumOfSplits-MR"; -290 LOG.info("Before map/reduce startup - job " + jobName); -291 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); -292 Scan scan = new Scan(); -293 scan.addFamily(INPUT_FAMILYS[0]); -294 scan.addFamily(INPUT_FAMILYS[1]); -295 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); -296 c.set(KEY_STARTROW, ""); -297 c.set(KEY_LASTROW, ""); -298 Job job = Job.getInstance(c, jobName); -299 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, -300 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); -301 job.setReducerClass(ScanReducer.class); -302 job.setNumReduceTasks(1); -303 job.setOutputFormatClass(NullOutputFormat.class); -304 assertTrue("job failed!", job.waitForCompletion(true)); -305 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, -306 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps -307 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, -308 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); -309 } -310 -311 /** -312 * Run MR job to test autobalance for setting number of mappers for TIF -313 * This does not run real MR job -314 */ -315 public void testAutobalanceNumOfSplit() throws IOException { -316 // set up splits for testing -317 List<InputSplit> splits = new ArrayList<>(5); -318 int[] regionLen = {10, 20, 20, 40, 60}; -319 for (int i = 0; i < 5; i++) { -320 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), -321 Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); -322 splits.add(split); -323 } -324 TableInputFormat tif = new TableInputFormat(); -325 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); -326 -327 assertEquals("Saw the wrong number of splits", 5, res.size()); -328 TableSplit ts1 = (TableSplit) res.get(0); -329 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); -330 TableSplit ts2 = (TableSplit) res.get(1); -331 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); -332 TableSplit ts3 = (TableSplit) res.get(2); -333 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); -334 TableSplit ts4 = (TableSplit) res.get(4); -335 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); -336 } -337} -338 +192 Job job = Job.getInstance(c, jobName); +193 job.setMapperClass(ScanMapper.class); +194 job.setReducerClass(ScanReducer.class); +195 job.setMapOutputKeyClass(ImmutableBytesWritable.class); +196 job.setMapOutputValueClass(ImmutableBytesWritable.class); +197 job.setInputFormatClass(TableInputFormat.class); +198 job.setNumReduceTasks(1); +199 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); +200 TableMapReduceUtil.addDependencyJars(job); +201 assertTrue(job.waitForCompletion(true)); +202 } +203 +204 /** +205 * Tests a MR scan using specific start and stop rows. +206 */ +207 protected void testScan(String start, String stop, String last) +208 throws IOException, InterruptedException, ClassNotFoundException { +209 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + +210 (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); +211 LOG.info("Before map/reduce startup - job " + jobName); +212 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); +213 Scan scan = new Scan(); +214 scan.addFamily(INPUT_FAMILYS[0]); +215 scan.addFamily(INPUT_FAMILYS[1]); +216 if (start != null) { +217 scan.withStartRow(Bytes.toBytes(start)); +218 } +219 c.set(KEY_STARTROW, start != null ? start : ""); +220 if (stop != null) { +221 scan.withStopRow(Bytes.toBytes(stop)); +222 } +223 c.set(KEY_LASTROW, last != null ? last : ""); +224 LOG.info("scan before: " + scan); +225 Job job = Job.getInstance(c, jobName); +226 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ScanMapper.class, +227 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); +228 job.setReducerClass(ScanReducer.class); +229 job.setNumReduceTasks(1); // one to get final "first" and "last" key +230 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); +231 LOG.info("Started " + job.getJobName()); +232 assertTrue(job.waitForCompletion(true)); +233 LOG.info("After map/reduce completion - job " + jobName); +234 } +235 +236 +237 /** +238 * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX +239 * This test does not run MR job +240 */ +241 protected void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) +242 throws IOException, InterruptedException, ClassNotFoundException { +243 String jobName = "TestJobForNumOfSplits"; +244 LOG.info("Before map/reduce startup - job " + jobName); +245 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); +246 Scan scan = new Scan(); +247 scan.addFamily(INPUT_FAMILYS[0]); +248 scan.addFamily(INPUT_FAMILYS[1]); +249 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); +250 c.set(KEY_STARTROW, ""); +251 c.set(KEY_LASTROW, ""); +252 Job job = Job.getInstance(c, jobName); +253 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, +254 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); +255 TableInputFormat tif = new TableInputFormat(); +256 tif.setConf(job.getConfiguration()); +257 Assert.assertEquals(TABLE_NAME, table.getName()); +258 List<InputSplit> splits = tif.getSplits(job); +259 Assert.assertEquals(expectedNumOfSplits, splits.size()); +260 } +261 +262 /** +263 * Run MR job to check the number of mapper = expectedNumOfSplits +264 */ +265 protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) +266 throws IOException, InterruptedException, ClassNotFoundException { +267 String jobName = "TestJobForNumOfSplits-MR"; +268 LOG.info("Before map/reduce startup - job " + jobName); +269 JobConf c = new JobConf(TEST_UTIL.getConfiguration()); +270 Scan scan = new Scan(); +271 scan.addFamily(INPUT_FAMILYS[0]); +272 scan.addFamily(INPUT_FAMILYS[1]); +273 c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion); +274 c.set(KEY_STARTROW, ""); +275 c.set(KEY_LASTROW, ""); +276 Job job = Job.getInstance(c, jobName); +277 TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, +278 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); +279 job.setReducerClass(ScanReducer.class); +280 job.setNumReduceTasks(1); +281 job.setOutputFormatClass(NullOutputFormat.class); +282 assertTrue("job failed!", job.waitForCompletion(true)); +283 // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, +284 // we use TaskCounter.SHUFFLED_MAPS to get total launched maps +285 assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, +286 job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); +287 } +288 +289 /** +290 * Run MR job to test autobalance for setting number of mappers for TIF This does not run real MR +291 * job +292 */ +293 protected void testAutobalanceNumOfSplit() throws IOException { +294 // set up splits for testing +295 List<InputSplit> splits = new ArrayList<>(5); +296 int[] regionLen = { 10, 20, 20, 40, 60 }; +297 for (int i = 0; i < 5; i++) { +298 InputSplit split = new TableSplit(TABLE_NAME, new Scan(), Bytes.toBytes(i), +299 Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); +300 splits.add(split); +301 } +302 TableInputFormat tif = new TableInputFormat(); +303 List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824); +304 +305 assertEquals("Saw the wrong number of splits", 5, res.size()); +306 TableSplit ts1 = (TableSplit) res.get(0); +307 assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); +308 TableSplit ts2 = (TableSplit) res.get(1); +309 assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength()); +310 TableSplit ts3 = (TableSplit) res.get(2); +311 assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); +312 TableSplit ts4 = (TableSplit) res.get(4); +313 assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); +314 } +315} +316