Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 86641 invoked from network); 6 Apr 2009 06:26:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Apr 2009 06:26:01 -0000 Received: (qmail 832 invoked by uid 500); 6 Apr 2009 06:26:00 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 688 invoked by uid 500); 6 Apr 2009 06:26:00 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 364 invoked by uid 99); 6 Apr 2009 06:26:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2009 06:25:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2009 06:25:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E2B602388D4E; Mon, 6 Apr 2009 06:25:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r762216 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Date: Mon, 06 Apr 2009 06:25:38 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090406062538.E2B602388D4E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Mon Apr 6 06:25:38 2009 New Revision: 762216 URL: http://svn.apache.org/viewvc?rev=762216&view=rev Log: HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley) Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762216&r1=762215&r2=762216&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Apr 6 06:25:38 2009 @@ -722,6 +722,8 @@ HADOOP-5468. Add sub-menus to forrest documentation and make some minor edits. (Corinne Chandel via szetszwo) + HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley) + OPTIMIZATIONS HADOOP-3293. Fixes FileInputFormat to do provide locations for splits Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=762216&r1=762215&r2=762216&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Mon Apr 6 06:25:38 2009 @@ -18,10 +18,18 @@ package org.apache.hadoop.mapred; +import java.io.IOException; + +import junit.extensions.TestSetup; +import junit.framework.Test; import junit.framework.TestCase; +import junit.framework.TestSuite; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; @@ -39,9 +47,30 @@ // Knobs to control randomwriter; and hence sort private static final int NUM_HADOOP_SLAVES = 3; - private static final int RW_BYTES_PER_MAP = 2 * 1024 * 1024; + // make it big enough to cause a spill in the map + private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024; private static final int RW_MAPS_PER_HOST = 2; + private static MiniMRCluster mrCluster = null; + private static MiniDFSCluster dfsCluster = null; + private static FileSystem dfs = null; + public static Test suite() { + TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) { + protected void setUp() throws Exception { + Configuration conf = new Configuration(); + dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null); + dfs = dfsCluster.getFileSystem(); + mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, + dfs.getUri().toString(), 1); + } + protected void tearDown() throws Exception { + if (dfsCluster != null) { dfsCluster.shutdown(); } + if (mrCluster != null) { mrCluster.shutdown(); } + } + }; + return setup; + } + private static void runRandomWriter(JobConf job, Path sortInput) throws Exception { // Scale down the default settings for RandomWriter for the test-case @@ -57,8 +86,10 @@ private static void runSort(JobConf job, Path sortInput, Path sortOutput) throws Exception { + job.setInt("mapred.job.reuse.jvm.num.tasks", -1); job.setInt("io.sort.mb", 1); - job.setLong("mapred.min.split.size", Long.MAX_VALUE); + job.setNumMapTasks(12); + // Setup command-line arguments to 'sort' String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; @@ -75,34 +106,66 @@ // Run Sort-Validator assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0); } - Configuration conf = new Configuration(); - public void testMapReduceSort() throws Exception { - MiniDFSCluster dfs = null; - MiniMRCluster mr = null; - FileSystem fileSys = null; - try { - - // Start the mini-MR and mini-DFS clusters - dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null); - fileSys = dfs.getFileSystem(); - mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1); - - // Run randomwriter to generate input for 'sort' - runRandomWriter(mr.createJobConf(), SORT_INPUT_PATH); - - // Run sort - runSort(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); - - // Run sort-validator to check if sort worked correctly - runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); - } finally { - if (dfs != null) { dfs.shutdown(); } - if (mr != null) { mr.shutdown(); - } + + private static class ReuseDetector extends MapReduceBase + implements Mapper { + static int instances = 0; + Reporter reporter = null; + + @Override + public void map(BytesWritable key, BytesWritable value, + OutputCollector output, + Reporter reporter) throws IOException { + this.reporter = reporter; + } + + public void close() throws IOException { + reporter.incrCounter("jvm", "use", ++instances); } } - public void testMapReduceSortWithJvmReuse() throws Exception { - conf.setInt("mapred.job.reuse.jvm.num.tasks", -1); - testMapReduceSort(); + + private static void runJvmReuseTest(JobConf job, + boolean reuse) throws IOException { + // setup a map-only job that reads the input and only sets the counters + // based on how many times the jvm was reused. + job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1); + FileInputFormat.setInputPaths(job, SORT_INPUT_PATH); + job.setInputFormat(SequenceFileInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + job.setMapperClass(ReuseDetector.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumMapTasks(24); + job.setNumReduceTasks(0); + RunningJob result = JobClient.runJob(job); + long uses = result.getCounters().findCounter("jvm", "use").getValue(); + System.out.println("maps = " + job.getNumMapTasks()); + System.out.println(result.getCounters()); + int maps = job.getNumMapTasks(); + if (reuse) { + assertTrue("maps = " + maps + ", uses = " + uses, maps < uses); + } else { + assertEquals("uses should be number of maps", job.getNumMapTasks(), uses); + } + } + + public void testMapReduceSort() throws Exception { + // Run randomwriter to generate input for 'sort' + runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH); + + // Run sort + runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); + + // Run sort-validator to check if sort worked correctly + runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, + SORT_OUTPUT_PATH); + } + + public void testJvmReuse() throws Exception { + runJvmReuseTest(mrCluster.createJobConf(), true); + } + + public void testNoJvmReuse() throws Exception { + runJvmReuseTest(mrCluster.createJobConf(), false); } }