hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r762216 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Date Mon, 06 Apr 2009 06:25:38 GMT
Author: omalley
Date: Mon Apr  6 06:25:38 2009
New Revision: 762216

URL: http://svn.apache.org/viewvc?rev=762216&view=rev
Log:
HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762216&r1=762215&r2=762216&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Apr  6 06:25:38 2009
@@ -722,6 +722,8 @@
     HADOOP-5468. Add sub-menus to forrest documentation and make some minor
     edits.  (Corinne Chandel via szetszwo)
 
+    HADOOP-5437. Fix TestMiniMRDFSSort to properly test jvm-reuse. (omalley)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=762216&r1=762215&r2=762216&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Mon Apr  6
06:25:38 2009
@@ -18,10 +18,18 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
 import junit.framework.TestCase;
+import junit.framework.TestSuite;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
@@ -39,9 +47,30 @@
 
   // Knobs to control randomwriter; and hence sort
   private static final int NUM_HADOOP_SLAVES = 3;
-  private static final int RW_BYTES_PER_MAP = 2 * 1024 * 1024;
+  // make it big enough to cause a spill in the map
+  private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
   private static final int RW_MAPS_PER_HOST = 2;
 
+  private static MiniMRCluster mrCluster = null;
+  private static MiniDFSCluster dfsCluster = null;
+  private static FileSystem dfs = null;
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
+        dfs = dfsCluster.getFileSystem();
+        mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, 
+                                      dfs.getUri().toString(), 1);
+      }
+      protected void tearDown() throws Exception {
+        if (dfsCluster != null) { dfsCluster.shutdown(); }
+        if (mrCluster != null) { mrCluster.shutdown(); }
+      }
+    };
+    return setup;
+  }
+
   private static void runRandomWriter(JobConf job, Path sortInput) 
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
@@ -57,8 +86,10 @@
   private static void runSort(JobConf job, Path sortInput, Path sortOutput) 
   throws Exception {
 
+    job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
     job.setInt("io.sort.mb", 1);
-    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+    job.setNumMapTasks(12);
+
     // Setup command-line arguments to 'sort'
     String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
     
@@ -75,34 +106,66 @@
     // Run Sort-Validator
     assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
   }
-  Configuration conf = new Configuration();
-  public void testMapReduceSort() throws Exception {
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-    try {
-
-      // Start the mini-MR and mini-DFS clusters
-      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
-      fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
-
-      // Run randomwriter to generate input for 'sort'
-      runRandomWriter(mr.createJobConf(), SORT_INPUT_PATH);
-      
-      // Run sort
-      runSort(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
-      
-      // Run sort-validator to check if sort worked correctly
-      runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
-    } finally {
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown();
-      }
+  
+  private static class ReuseDetector extends MapReduceBase
+      implements Mapper<BytesWritable,BytesWritable, Text, Text> {
+    static int instances = 0;
+    Reporter reporter = null;
+
+    @Override
+    public void map(BytesWritable key, BytesWritable value,
+                    OutputCollector<Text, Text> output, 
+                    Reporter reporter) throws IOException {
+      this.reporter = reporter;
+    }
+    
+    public void close() throws IOException {
+      reporter.incrCounter("jvm", "use", ++instances);
     }
   }
-  public void testMapReduceSortWithJvmReuse() throws Exception {
-    conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
-    testMapReduceSort();
+
+  private static void runJvmReuseTest(JobConf job,
+                                      boolean reuse) throws IOException {
+    // setup a map-only job that reads the input and only sets the counters
+    // based on how many times the jvm was reused.
+    job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
+    FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setMapperClass(ReuseDetector.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumMapTasks(24);
+    job.setNumReduceTasks(0);
+    RunningJob result = JobClient.runJob(job);
+    long uses = result.getCounters().findCounter("jvm", "use").getValue();
+    System.out.println("maps = " + job.getNumMapTasks());
+    System.out.println(result.getCounters());
+    int maps = job.getNumMapTasks();
+    if (reuse) {
+      assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
+    } else {
+      assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
+    }
+  }
+
+  public void testMapReduceSort() throws Exception {
+    // Run randomwriter to generate input for 'sort'
+    runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
+
+    // Run sort
+    runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+    // Run sort-validator to check if sort worked correctly
+    runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, 
+                     SORT_OUTPUT_PATH);
+  }
+  
+  public void testJvmReuse() throws Exception {
+    runJvmReuseTest(mrCluster.createJobConf(), true);
+  }
+
+  public void testNoJvmReuse() throws Exception {
+    runJvmReuseTest(mrCluster.createJobConf(), false);
   }
 }



Mime
View raw message