hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r555690 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Thu, 12 Jul 2007 16:40:11 GMT
Author: omalley
Date: Thu Jul 12 09:40:10 2007
New Revision: 555690

URL: http://svn.apache.org/viewvc?view=rev&rev=555690
Log:
HADOOP-1535.  Fix the comparator used to merge in reduce phase.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
Removed:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestUserValueGrouping.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555690&r1=555689&r2=555690
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jul 12 09:40:10 2007
@@ -328,6 +328,9 @@
 101. HADOOP-1596.  Fix the parsing of taskids by streaming and improve the
      error reporting. (omalley)
 
+102. HADOOP-1535.  Fix the user-controlled grouping to the reduce function.
+     (Vivek Ratan via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=555690&r1=555689&r2=555690
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Jul 12 09:40:10
2007
@@ -518,7 +518,7 @@
       {
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
-        Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
         sorter.setProgressable(reporter);
         
         for (int parts = 0; parts < partitions; parts++){

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=555690&r1=555689&r2=555690
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jul 12 09:40:10
2007
@@ -237,7 +237,6 @@
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
-    Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                                            job.getReducerClass(), job);
 
@@ -276,8 +275,6 @@
     
     Path tempDir = new Path(getTaskId()); 
 
-    WritableComparator comparator = job.getOutputValueGroupingComparator();
-    
     SequenceFile.Sorter.RawKeyValueIterator rIter;
  
     setPhase(TaskStatus.Phase.SORT); 
@@ -285,8 +282,8 @@
     final Reporter reporter = getReporter(umbilical);
     
     // sort the input file
-    SequenceFile.Sorter sorter =
-      new SequenceFile.Sorter(lfs, comparator, valueClass, job);
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
+        job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
     sorter.setProgressable(reporter);
     rIter = sorter.merge(mapFiles, tempDir, 
         !conf.getKeepFailedTaskFiles()); // sort
@@ -315,8 +312,10 @@
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
-      ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                                             keyClass, valClass, job, reporter);
+      
+      ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
+          job.getOutputValueGroupingComparator(), keyClass, valClass, 
+          job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?view=auto&rev=555690
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Thu Jul 12
09:40:10 2007
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BooleanWritable.Comparator;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/**
+ * Two different types of comparators can be used in MapReduce. One is used
+ * during the Map and Reduce phases, to sort/merge key-value pairs. Another
+ * is used to group values for a particular key, when calling the user's 
+ * reducer. A user can override both of these two. 
+ * This class has tests for making sure we use the right comparators at the 
+ * right places. See Hadoop issues 485 and 1535. Our tests: 
+ * 1. Test that the same comparator is used for all sort/merge operations 
+ * during the Map and Reduce phases.  
+ * 2. Test the common use case where values are grouped by keys but values 
+ * within each key are grouped by a secondary key (a timestamp, for example). 
+ */
+public class TestComparators extends TestCase 
+{
+  JobConf conf = new JobConf(TestMapOutputType.class);
+  JobClient jc;
+  static Random rng = new Random();
+
+  /** 
+   * RandomGen is a mapper that generates 5 random values for each key
+   * in the input. The values are in the range [0-4]. The mapper also
+   * generates a composite key. If the input key is x and the generated
+   * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
+   * mediate key value pairs are ordered by {input key, value}.
+   * Think of the random value as a timestamp associated with the record. 
+   */
+  static class RandomGenMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector out, Reporter reporter) throws IOException {
+      int num_values = 5;
+      for(int i = 0; i < num_values; ++i) {
+        int val = rng.nextInt(num_values);
+        int compositeKey = ((IntWritable)(key)).get() * 100 + val;
+        out.collect(new IntWritable(compositeKey), new IntWritable(val));
+      }
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** 
+   * Your basic identity mapper. 
+   */
+  static class IdentityMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector out, Reporter reporter) throws IOException {
+      out.collect(key, value);
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** 
+   * Checks whether keys are in ascending order.  
+   */
+  static class AscendingKeysReducer implements Reducer {
+    public void configure(JobConf job) {}
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MIN_VALUE;
+    public void reduce(WritableComparable key, Iterator values, 
+        OutputCollector out, Reporter reporter) throws IOException {
+      int currentKey = ((IntWritable)(key)).get();
+      // keys should be in ascending order
+      if (currentKey < lastKey) {
+        fail("Keys not in sorted ascending order");
+      }
+      lastKey = currentKey;
+      out.collect(key, new Text("success"));
+    }
+    
+    public void close() {}
+  }
+  
+  /** 
+   * Checks whether keys are in ascending order.  
+   */
+  static class DescendingKeysReducer implements Reducer {
+    public void configure(JobConf job) {}
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MAX_VALUE;
+    public void reduce(WritableComparable key, Iterator values, 
+        OutputCollector out, Reporter reporter) throws IOException {
+      int currentKey = ((IntWritable)(key)).get();
+      // keys should be in descending order
+      if (currentKey > lastKey) {
+        fail("Keys not in sorted descending order");
+      }
+      lastKey = currentKey;
+      out.collect(key, new Text("success"));
+    }
+    
+    public void close() {}
+  }
+  
+  /** The reducer checks whether the input values are in ascending order and
+   * whether they are correctly grouped by key (i.e. each call to reduce
+   * should have 5 values if the grouping is correct). It also checks whether
+   * the keys themselves are in ascending order.
+   */
+  static class AscendingGroupReducer implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MIN_VALUE;
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      // check key order
+      int currentKey = ((IntWritable)(key)).get();
+      if (currentKey < lastKey) {
+        fail("Keys not in sorted ascending order");
+      }
+      lastKey = currentKey;
+      // check order of values
+      IntWritable previous = new IntWritable(Integer.MIN_VALUE);
+      int valueCount = 0;
+      while (values.hasNext()) {
+        IntWritable current = (IntWritable) values.next();
+        
+        // Check that the values are sorted
+        if (current.compareTo(previous) < 0)
+          fail("Values generated by Mapper not in order");
+        previous = current;
+        ++valueCount;
+      }
+      if (valueCount != 5) {
+        fail("Values not grouped by primary key");
+      }
+      out.collect(key, new Text("success"));
+    }
+
+    public void close() {
+    }
+  }
+  
+  /** The reducer checks whether the input values are in descending order and
+   * whether they are correctly grouped by key (i.e. each call to reduce
+   * should have 5 values if the grouping is correct). 
+   */
+  static class DescendingGroupReducer implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    // keep track of the last key we've seen
+    private int lastKey = Integer.MAX_VALUE;
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      // check key order
+      int currentKey = ((IntWritable)(key)).get();
+      if (currentKey > lastKey) {
+        fail("Keys not in sorted descending order");
+      }
+      lastKey = currentKey;
+      // check order of values
+      IntWritable previous = new IntWritable(Integer.MAX_VALUE);
+      int valueCount = 0;
+      while (values.hasNext()) {
+        IntWritable current = (IntWritable) values.next();
+        
+        // Check that the values are sorted
+        if (current.compareTo(previous) > 0)
+          fail("Values generated by Mapper not in order");
+        previous = current;
+        ++valueCount;
+      }
+      if (valueCount != 5) {
+        fail("Values not grouped by primary key");
+      }
+      out.collect(key, new Text("success"));
+    }
+
+    public void close() {
+    }
+  }
+  
+  /** 
+   * A decreasing Comparator for IntWritable 
+   */ 
+  public static class DecreasingIntComparator extends IntWritable.Comparator {
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return -super.compare(b1, s1, l1, b2, s2, l2);
+    }
+    static {                    // register this comparator
+      WritableComparator.define(DecreasingIntComparator.class, new Comparator());
+    }
+  }
+
+  /** Grouping function for values based on the composite key. This
+   * comparator strips off the secondary key part from the x0y composite
+   * and only compares the primary key value (x).
+   */
+  public static class CompositeIntGroupFn extends WritableComparator {
+    public CompositeIntGroupFn() {
+      super(IntWritable.class);
+    }
+    public int compare (WritableComparable v1, WritableComparable v2) {
+      int val1 = ((IntWritable)(v1)).get() / 100;
+      int val2 = ((IntWritable)(v2)).get() / 100;
+      if (val1 < val2)
+        return 1;
+      else if (val1 > val2)
+        return -1;
+      else
+        return 0;
+    }
+    
+    public boolean equals (IntWritable v1, IntWritable v2) {
+      int val1 = v1.get();
+      int val2 = v2.get();
+      
+      return (val1/100) == (val2/100);
+    }
+    
+    static {
+      WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
+    }
+  }
+
+  /** Reverse grouping function for values based on the composite key. 
+   */
+  public static class CompositeIntReverseGroupFn extends CompositeIntGroupFn {
+    public int compare (WritableComparable v1, WritableComparable v2) {
+      return -super.compare(v1, v2);
+    }
+    
+    public boolean equals (IntWritable v1, IntWritable v2) {
+      return !(super.equals(v1, v2));
+    }
+    
+    static {
+      WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
+    }
+  }
+
+
+  public void configure() throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    // set up two map jobs, so we can test merge phase in Reduce also
+    conf.setNumMapTasks(2);
+    
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    // set up input data in 2 files 
+    Path inFile = new Path(inDir, "part0");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
+        IntWritable.class, IntWritable.class);
+    writer.append(new IntWritable(11), new IntWritable(999));
+    writer.append(new IntWritable(23), new IntWritable(456));
+    writer.append(new IntWritable(10), new IntWritable(780));
+    writer.close();
+    inFile = new Path(inDir, "part1");
+    writer = SequenceFile.createWriter(fs, conf, inFile, 
+        IntWritable.class, IntWritable.class);
+    writer.append(new IntWritable(45), new IntWritable(100));
+    writer.append(new IntWritable(18), new IntWritable(200));
+    writer.append(new IntWritable(27), new IntWritable(300));
+    writer.close();
+    
+    jc = new JobClient(conf);
+  }
+  
+  /**
+   * Test the default comparator for Map/Reduce. 
+   * Use the identity mapper and see if the keys are sorted at the end
+   * @throws Exception
+   */
+  public void testDefaultMRComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(AscendingKeysReducer.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test user-defined comparator for Map/Reduce.
+   * We provide our own comparator that is the reverse of the default int 
+   * comparator. Keys should be sorted in reverse order in the reducer. 
+   * @throws Exception
+   */
+  public void testUserMRComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(DescendingKeysReducer.class);
+    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test user-defined grouping comparator for grouping values in Reduce.
+   * We generate composite keys that contain a random number, which acts
+   * as a timestamp associated with the record. In our Reduce function, 
+   * values for a key should be sorted by the 'timestamp'. 
+   * @throws Exception
+   */
+  public void testUserValueGroupingComparator() throws Exception { 
+    configure();
+    conf.setMapperClass(RandomGenMapper.class);
+    conf.setReducerClass(AscendingGroupReducer.class);
+    conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+  /**
+   * Test all user comparators. Super-test of all tests here. 
+   * We generate composite keys that contain a random number, which acts
+   * as a timestamp associated with the record. In our Reduce function, 
+   * values for a key should be sorted by the 'timestamp'.
+   * We also provide our own comparators that reverse the default sorting 
+   * order. This lets us make sure that the right comparators are used. 
+   * @throws Exception
+   */
+  public void testAllUserComparators() throws Exception { 
+    configure();
+    conf.setMapperClass(RandomGenMapper.class);
+    // use a decreasing comparator so keys are sorted in reverse order
+    conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
+    conf.setReducerClass(DescendingGroupReducer.class);
+    conf.setOutputValueGroupingComparator(CompositeIntReverseGroupFn.class);
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
+  
+}



Mime
View raw message