hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r794103 - in /hadoop/common/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Date Tue, 14 Jul 2009 22:42:07 GMT
Author: cdouglas
Date: Tue Jul 14 22:42:06 2009
New Revision: 794103

URL: http://svn.apache.org/viewvc?rev=794103&view=rev
Log:
MAPREDUCE-565. Fix partitioner to work with new API. Contributed by Owen O'Malley

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Jul 14 22:42:06 2009
@@ -175,6 +175,9 @@
     HADOOP-6145. Fix FsShell rm/rmr error messages when there is a FNFE.
     (Jakob Homan via szetszwo)
 
+    MAPREDUCE-565. Fix partitioner to work with new API. (Owen O'Malley via
+    cdouglas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue
Jul 14 22:42:06 2009
@@ -353,7 +353,7 @@
       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     try {
-      runner.run(in, collector, reporter);      
+      runner.run(in, new OldOutputCollector(collector, conf), reporter);
       collector.flush();
     } finally {
       //close
@@ -427,20 +427,80 @@
     }
   }
 
+  /**
+   * Since the mapred and mapreduce Partitioners don't share a common interface
+   * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
+   * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
+   * the configured partitioner should not be called. It's common for
+   * partitioners to compute a result mod numReduces, which causes a div0 error
+   */
+  private static class OldOutputCollector<K,V> implements OutputCollector<K,V>
{
+    private final Partitioner<K,V> partitioner;
+    private final MapOutputCollector<K,V> collector;
+    private final int numPartitions;
+
+    @SuppressWarnings("unchecked")
+    OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
+      numPartitions = conf.getNumReduceTasks();
+      if (numPartitions > 0) {
+        partitioner = (Partitioner<K,V>)
+          ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
+      } else {
+        partitioner = new Partitioner<K,V>() {
+          @Override
+          public void configure(JobConf job) { }
+          @Override
+          public int getPartition(K key, V value, int numPartitions) {
+            return -1;
+          }
+        };
+      }
+      this.collector = collector;
+    }
+
+    @Override
+    public void collect(K key, V value) throws IOException {
+      try {
+        collector.collect(key, value,
+                          partitioner.getPartition(key, value, numPartitions));
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new IOException("interrupt exception", ie);
+      }
+    }
+  }
+
   private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
-    private MapOutputCollector<K,V> collector;
+    private final MapOutputCollector<K,V> collector;
+    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
+    private final int partitions;
 
-    NewOutputCollector(JobConf job, 
+    @SuppressWarnings("unchecked")
+    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+                       JobConf job,
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException {
       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+      partitions = jobContext.getNumReduceTasks();
+      if (partitions > 0) {
+        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
+          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
+      } else {
+        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
+          @Override
+          public int getPartition(K key, V value, int numPartitions) {
+            return -1;
+          }
+        };
+      }
     }
 
     @Override
-    public void write(K key, V value) throws IOException {
-      collector.collect(key, value);
+    public void write(K key, V value) throws IOException, InterruptedException {
+      collector.collect(key, value,
+                        partitioner.getPartition(key, value, partitions));
     }
 
     @Override
@@ -510,7 +570,7 @@
       if (job.getNumReduceTasks() == 0) {
         output = outputFormat.getRecordWriter(taskContext);
       } else {
-        output = new NewOutputCollector(job, umbilical, reporter);
+        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
       }
 
       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
@@ -532,9 +592,10 @@
     }
   }
 
-  interface MapOutputCollector<K, V>
-    extends OutputCollector<K, V> {
+  interface MapOutputCollector<K, V> {
 
+    public void collect(K key, V value, int partition
+                        ) throws IOException, InterruptedException;
     public void close() throws IOException, InterruptedException;
     
     public void flush() throws IOException, InterruptedException, 
@@ -574,7 +635,7 @@
                                ClassNotFoundException {
     }
 
-    public void collect(K key, V value) throws IOException {
+    public void collect(K key, V value, int partition) throws IOException {
       reporter.progress();
       out.write(key, value);
       mapOutputRecordCounter.increment(1);
@@ -585,7 +646,6 @@
   class MapOutputBuffer<K extends Object, V extends Object> 
   implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
-    private final Partitioner<K, V> partitioner;
     private final JobConf job;
     private final TaskReporter reporter;
     private final Class<K> keyClass;
@@ -653,7 +713,6 @@
       this.reporter = reporter;
       localFs = FileSystem.getLocal(job);
       partitions = job.getNumReduceTasks();
-      partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
        
       rfs = ((LocalFileSystem)localFs).getRaw();
 
@@ -739,8 +798,8 @@
       }
     }
 
-    public synchronized void collect(K key, V value)
-        throws IOException {
+    public synchronized void collect(K key, V value, int partition
+                                     ) throws IOException {
       reporter.progress();
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
@@ -801,7 +860,6 @@
         valSerializer.serialize(value);
         int valend = bb.markRecord();
 
-        final int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
               partition + ")");
@@ -821,7 +879,7 @@
         kvindex = kvnext;
       } catch (MapBufferTooSmallException e) {
         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
-        spillSingleRecord(key, value);
+        spillSingleRecord(key, value, partition);
         mapOutputRecordCounter.increment(1);
         return;
       }
@@ -1201,11 +1259,10 @@
      * the in-memory buffer, so we must spill the record from collect
      * directly to a spill file. Consider this "losing".
      */
-    private void spillSingleRecord(final K key, final V value) 
-        throws IOException {
+    private void spillSingleRecord(final K key, final V value,
+                                   int partition) throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
-      final int partition = partitioner.getPartition(key, value, partitions);
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
(original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
Tue Jul 14 22:42:06 2009
@@ -27,6 +27,12 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
+import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
+import org.apache.hadoop.examples.SecondarySort.IntPair;
+import org.apache.hadoop.examples.SecondarySort;
+import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -35,6 +41,10 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.MRCaching.TestResult;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapReduceLocal;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -45,7 +55,8 @@
     new File(System.getProperty("test.build.data","/tmp"))
     .toURI().toString().replace(' ', '+');
     
-  public void testWithLocal() throws IOException {
+  public void testWithLocal()
+      throws IOException, InterruptedException, ClassNotFoundException {
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
@@ -80,6 +91,7 @@
       assertEquals("number of reduce outputs", 9, 
                    counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
       runCustomFormats(mr);
+      runSecondarySort(mr.createJobConf());
     } finally {
       if (mr != null) { mr.shutdown(); }
     }
@@ -284,4 +296,47 @@
                                  JobConf job) throws IOException {
     }
   }
+
+  private void runSecondarySort(Configuration conf) throws IOException,
+                                                        InterruptedException,
+                                                        ClassNotFoundException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    TestMapReduceLocal.writeFile
+             ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
+              "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
+    Job job = new Job(conf, "word count");
+    job.setJarByClass(WordCount.class);
+    job.setNumReduceTasks(2);
+    job.setMapperClass(SecondarySort.MapClass.class);
+    job.setReducerClass(SecondarySort.Reduce.class);
+    // group and partition by the first int in the pair
+    job.setPartitionerClass(FirstPartitioner.class);
+    job.setGroupingComparatorClass(FirstGroupingComparator.class);
+
+    // the map output is IntPair, IntWritable
+    job.setMapOutputKeyClass(IntPair.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    // the reduce output is Text, IntWritable
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+    assertTrue(job.waitForCompletion(true));
+    String out = TestMapReduceLocal.readFile("out/part-r-00000");
+    assertEquals("------------------------------------------------\n" +
+                 "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
+                 "------------------------------------------------\n" +
+                 "10\t20\n10\t25\n10\t30\n", out);
+    out = TestMapReduceLocal.readFile("out/part-r-00001");
+    assertEquals("------------------------------------------------\n" +
+                 "-3\t23\n" +
+                 "------------------------------------------------\n" +
+                 "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
+                 "------------------------------------------------\n" +
+                 "5\t10\n", out);
+  }
 }

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=794103&r1=794102&r2=794103&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Jul 14 22:42:06 2009
@@ -27,11 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SecondarySort;
 import org.apache.hadoop.examples.WordCount;
-import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
-import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
-import org.apache.hadoop.examples.SecondarySort.IntPair;
 import org.apache.hadoop.examples.WordCount.IntSumReducer;
 import org.apache.hadoop.examples.WordCount.TokenizerMapper;
 import org.apache.hadoop.fs.FileSystem;
@@ -61,7 +57,7 @@
     }
   }
 
-  public Path writeFile(String name, String data) throws IOException {
+  public static Path writeFile(String name, String data) throws IOException {
     Path file = new Path(TEST_ROOT_DIR + "/" + name);
     localFs.delete(file, false);
     DataOutputStream f = localFs.create(file);
@@ -70,7 +66,7 @@
     return file;
   }
 
-  public String readFile(String name) throws IOException {
+  public static String readFile(String name) throws IOException {
     DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
     BufferedReader b = new BufferedReader(new InputStreamReader(f));
     StringBuilder result = new StringBuilder();
@@ -90,7 +86,6 @@
       mr = new MiniMRCluster(2, "file:///", 3);
       Configuration conf = mr.createJobConf();
       runWordCount(conf);
-      runSecondarySort(conf);
     } finally {
       if (mr != null) { mr.shutdown(); }
     }
@@ -162,43 +157,4 @@
     assertTrue("combine in > combine out", combineIn > combineOut);
   }
 
-  private void runSecondarySort(Configuration conf) throws IOException,
-                                                        InterruptedException,
-                                                        ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-    writeFile("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
-              "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
-    Job job = new Job(conf, "word count");     
-    job.setJarByClass(WordCount.class);
-    job.setMapperClass(SecondarySort.MapClass.class);
-    job.setReducerClass(SecondarySort.Reduce.class);
-    // group and partition by the first int in the pair
-    job.setPartitionerClass(FirstPartitioner.class);
-    job.setGroupingComparatorClass(FirstGroupingComparator.class);
-
-    // the map output is IntPair, IntWritable
-    job.setMapOutputKeyClass(IntPair.class);
-    job.setMapOutputValueClass(IntWritable.class);
-
-    // the reduce output is Text, IntWritable
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
-    assertTrue(job.waitForCompletion(true));
-    String out = readFile("out/part-r-00000");
-    assertEquals("------------------------------------------------\n" +
-                 "-3\t23\n" +
-                 "------------------------------------------------\n" +
-                 "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
-                 "------------------------------------------------\n" +
-                 "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
-                 "------------------------------------------------\n" +
-                 "5\t10\n" +
-                 "------------------------------------------------\n" +
-                 "10\t20\n10\t25\n10\t30\n", out);
-  }
-  
 }



Mime
View raw message