hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r902704 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Date Mon, 25 Jan 2010 04:49:44 GMT
Author: cdouglas
Date: Mon Jan 25 04:49:43 2010
New Revision: 902704

URL: http://svn.apache.org/viewvc?rev=902704&view=rev
Log:
MAPREDUCE-1287. Only call the partitioner with more than one reducer.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=902704&r1=902703&r2=902704&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jan 25 04:49:43 2010
@@ -4,6 +4,9 @@
 
   INCOMPATIBLE CHANGES
 
+    MAPREDUCE-1287. Only call the partitioner with more than one reducer.
+    (cdouglas)
+
   NEW FEATURES
 
     MAPREDUCE-698. Per-pool task limits for the fair scheduler.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=902704&r1=902703&r2=902704&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jan 25 04:49:43
2010
@@ -483,7 +483,7 @@
     @SuppressWarnings("unchecked")
     OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
       numPartitions = conf.getNumReduceTasks();
-      if (numPartitions > 0) {
+      if (numPartitions > 1) {
         partitioner = (Partitioner<K,V>)
           ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
       } else {
@@ -492,7 +492,7 @@
           public void configure(JobConf job) { }
           @Override
           public int getPartition(K key, V value, int numPartitions) {
-            return -1;
+            return numPartitions - 1;
           }
         };
       }
@@ -562,14 +562,14 @@
                        ) throws IOException, ClassNotFoundException {
       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
       partitions = jobContext.getNumReduceTasks();
-      if (partitions > 0) {
+      if (partitions > 1) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
           ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
       } else {
         partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
           @Override
           public int getPartition(K key, V value, int numPartitions) {
-            return -1;
+            return partitions - 1;
           }
         };
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=902704&r1=902703&r2=902704&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Mon Jan
25 04:49:43 2010
@@ -34,7 +34,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -308,56 +307,6 @@
       
   }
 
-  private static class BadPartitioner
-      implements Partitioner<LongWritable,Text> {
-    boolean low;
-    public void configure(JobConf conf) {
-      low = conf.getBoolean("test.testmapred.badpartition", true);
-    }
-    public int getPartition(LongWritable k, Text v, int numPartitions) {
-      return low ? -1 : numPartitions;
-    }
-  }
-
-  @Test
-  public void testPartitioner() throws Exception {
-    JobConf conf = new JobConf(TestMapRed.class);
-    conf.setPartitionerClass(BadPartitioner.class);
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path testdir = new Path(
-        System.getProperty("test.build.data","/tmp")).makeQualified(fs);
-    Path inFile = new Path(testdir, "blah/blah");
-    DataOutputStream f = fs.create(inFile);
-    f.writeBytes("blah blah blah\n");
-    f.close();
-    FileInputFormat.setInputPaths(conf, inFile);
-    FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setReducerClass(IdentityReducer.class);
-    conf.setOutputKeyClass(LongWritable.class);
-    conf.setOutputValueClass(Text.class);
-
-    // partition too low
-    conf.setBoolean("test.testmapred.badpartition", true);
-    boolean pass = true;
-    try {
-      JobClient.runJob(conf);
-    } catch (IOException e) {
-      pass = false;
-    }
-    assertFalse("should fail for partition < 0", pass);
-
-    // partition too high
-    conf.setBoolean("test.testmapred.badpartition", false);
-    pass = true;
-    try {
-      JobClient.runJob(conf);
-    } catch (IOException e) {
-      pass = false;
-    }
-    assertFalse("should fail for partition >= numPartitions", pass);
-  }
-
   public static class NullMapper
       implements Mapper<NullWritable,Text,NullWritable,Text> {
     public void map(NullWritable key, Text val,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=902704&r1=902703&r2=902704&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Mon Jan 25 04:49:43 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import junit.extensions.TestSetup;
@@ -28,7 +29,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -178,4 +182,56 @@
   public void testNoJvmReuse() throws Exception {
     runJvmReuseTest(mrCluster.createJobConf(), false);
   }
+
+  private static class BadPartitioner
+      implements Partitioner<LongWritable,Text> {
+    boolean low;
+    public void configure(JobConf conf) {
+      low = conf.getBoolean("test.testmapred.badpartition", true);
+    }
+    public int getPartition(LongWritable k, Text v, int numPartitions) {
+      return low ? -1 : numPartitions;
+    }
+  }
+
+  public void testPartitioner() throws Exception {
+    JobConf conf = mrCluster.createJobConf();
+    conf.setPartitionerClass(BadPartitioner.class);
+    conf.setNumReduceTasks(3);
+    FileSystem fs = FileSystem.get(conf);
+    Path testdir =
+      new Path("blah").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path inFile = new Path(testdir, "blah");
+    DataOutputStream f = fs.create(inFile);
+    f.writeBytes("blah blah blah\n");
+    f.close();
+    FileInputFormat.setInputPaths(conf, inFile);
+    FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMaxMapAttempts(1);
+
+    // partition too low
+    conf.setBoolean("test.testmapred.badpartition", true);
+    boolean pass = true;
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      pass = false;
+    }
+    assertFalse("should fail for partition < 0", pass);
+
+    // partition too high
+    conf.setBoolean("test.testmapred.badpartition", false);
+    pass = true;
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      pass = false;
+    }
+    assertFalse("should fail for partition >= numPartitions", pass);
+  }
+
 }



Mime
View raw message