hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jo...@apache.org
Subject svn commit: r799356 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
Date Thu, 30 Jul 2009 16:34:13 GMT
Author: johan
Date: Thu Jul 30 16:34:13 2009
New Revision: 799356

URL: http://svn.apache.org/viewvc?rev=799356&view=rev
Log:
MAPREDUCE-797. Adds combiner support to MRUnit MapReduceDriver. (Aaron Kimball via johan)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=799356&r1=799355&r2=799356&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 30 16:34:13 2009
@@ -155,6 +155,9 @@
 
     MAPREDUCE-793. Creates a new test that consolidates a few tests to
     include in the commit-test list. (Jothi Padmanabhan via ddas)
+    
+    MAPREDUCE-797. Adds combiner support to MRUnit MapReduceDriver.
+    (Aaron Kimball via johan)    
 
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java?rev=799356&r1=799355&r2=799356&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/MapReduceDriver.java
Thu Jul 30 16:34:13 2009
@@ -35,6 +35,7 @@
 
 /**
  * Harness that allows you to test a Mapper and a Reducer instance together
+ * (along with an optional combiner).
  * You provide the input key and value that should be sent to the Mapper, and
  * outputs you expect to be sent by the Reducer to the collector for those
  * inputs. By calling runTest(), the harness will deliver the input to the
@@ -42,6 +43,9 @@
  * them), and will check the Reducer's outputs against the expected results.
  * This is designed to handle a single (k, v)* -> (k, v)* case from the
  * Mapper/Reducer pair, representing a single unit test.
+ *
+ * If a combiner is specified, then it will be run exactly once after
+ * the Mapper and before the Reducer.
  */
 public class MapReduceDriver<K1, V1, K2, V2, K3, V3>
     extends TestDriver<K1, V1, K3, V3> {
@@ -50,6 +54,7 @@
 
   private Mapper<K1, V1, K2, V2> myMapper;
   private Reducer<K2, V2, K3, V3> myReducer;
+  private Reducer<K2, V2, K2, V2> myCombiner;
 
   private List<Pair<K1, V1>> inputList;
 
@@ -60,6 +65,15 @@
     inputList = new ArrayList<Pair<K1, V1>>();
   }
 
+  public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
+                         final Reducer<K2, V2, K3, V3> r,
+                         final Reducer<K2, V2, K2, V2> c) {
+    myMapper = m;
+    myReducer = r;
+    myCombiner = c;
+    inputList = new ArrayList<Pair<K1, V1>>();
+  }
+
   public MapReduceDriver() {
     inputList = new ArrayList<Pair<K1, V1>>();
   }
@@ -111,6 +125,32 @@
   }
 
   /**
+   * Sets the reducer object to use as a combiner for this test
+   * @param c The combiner object to use
+   */
+  public void setCombiner(Reducer<K2, V2, K2, V2> c) {
+    myCombiner = c;
+  }
+
+  /**
+   * Identical to setCombiner(), but with fluent programming style
+   * @param c The Combiner to use
+   * @return this
+   */
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withCombiner(
+          Reducer<K2, V2, K2, V2> c) {
+    setCombiner(c);
+    return this;
+  }
+
+  /**
+   * @return the Combiner object being used for this test
+   */
+  public Reducer<K2, V2, K2, V2> getCombiner() {
+    return myCombiner;
+  }
+
+  /**
    * Adds an input to send to the mapper
    * @param key
    * @param val
@@ -257,6 +297,31 @@
     return this;
   }
 
+  /** The private class to manage starting the reduce phase is used for type
+      genericity reasons. This class is used in the run() method. */
+  private class ReducePhaseRunner<OUTKEY, OUTVAL> {
+    private List<Pair<OUTKEY, OUTVAL>> runReduce(
+        List<Pair<K2, List<V2>>> inputs, Reducer<K2, V2, OUTKEY, OUTVAL>
reducer)
+        throws IOException {
+
+      List<Pair<OUTKEY, OUTVAL>> reduceOutputs = new ArrayList<Pair<OUTKEY,
OUTVAL>>();
+
+      for (Pair<K2, List<V2>> input : inputs) {
+        K2 inputKey = input.getFirst();
+        List<V2> inputValues = input.getSecond();
+        StringBuilder sb = new StringBuilder();
+        formatValueList(inputValues, sb);
+        LOG.debug("Reducing input (" + inputKey.toString() + ", "
+            + sb.toString() + ")");
+
+        reduceOutputs.addAll(new ReduceDriver<K2, V2, OUTKEY, OUTVAL>(reducer)
+                .withInputKey(inputKey).withInputValues(inputValues).run());
+      }
+
+      return reduceOutputs;
+    }
+  }
+
   public List<Pair<K3, V3>> run() throws IOException {
 
     List<Pair<K2, V2>> mapOutputs = new ArrayList<Pair<K2, V2>>();
@@ -269,22 +334,16 @@
               input).run());
     }
 
-    List<Pair<K2, List<V2>>> reduceInputs = shuffle(mapOutputs);
-    List<Pair<K3, V3>> reduceOutputs = new ArrayList<Pair<K3, V3>>();
-
-    for (Pair<K2, List<V2>> input : reduceInputs) {
-      K2 inputKey = input.getFirst();
-      List<V2> inputValues = input.getSecond();
-      StringBuilder sb = new StringBuilder();
-      formatValueList(inputValues, sb);
-      LOG.debug("Reducing input (" + inputKey.toString() + ", "
-          + sb.toString() + ")");
-
-      reduceOutputs.addAll(new ReduceDriver<K2, V2, K3, V3>(myReducer)
-              .withInputKey(inputKey).withInputValues(inputValues).run());
+    if (myCombiner != null) {
+      // User has specified a combiner. Run this and replace the mapper outputs
+      // with the result of the combiner.
+      LOG.debug("Starting combine phase with combiner: " + myCombiner);
+      mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(shuffle(mapOutputs), myCombiner);
     }
 
-    return reduceOutputs;
+    // Run the reduce phase.
+    LOG.debug("Starting reduce phase with reducer: " + myReducer);
+    return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs), myReducer);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=799356&r1=799355&r2=799356&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/test/org/apache/hadoop/mrunit/TestMapReduceDriver.java
Thu Jul 30 16:34:13 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.junit.Before;
@@ -233,5 +234,45 @@
     assertListEquals(expected, outputs);
   }
 
+  // Test "combining" with an IdentityReducer. Result should be the same.
+  @Test
+  public void testIdentityCombiner() {
+    driver
+            .withCombiner(new IdentityReducer<Text, LongWritable>())
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_A))
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_B))
+            .withInput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
+            .runTest();
+  }
+
+  // Test "combining" with another LongSumReducer. Result should be the same.
+  @Test
+  public void testLongSumCombiner() {
+    driver
+            .withCombiner(new LongSumReducer<Text>())
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_A))
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_B))
+            .withInput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
+            .runTest();
+  }
+
+  // Test "combining" with another LongSumReducer, and with the Reducer
+  // set to IdentityReducer. Result should be the same.
+  @Test
+  public void testLongSumCombinerAndIdentityReduce() {
+    driver
+            .withCombiner(new LongSumReducer<Text>())
+            .withReducer(new IdentityReducer<Text, LongWritable>())
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_A))
+            .withInput(new Text("foo"), new LongWritable(FOO_IN_B))
+            .withInput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("bar"), new LongWritable(BAR_IN))
+            .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
+            .runTest();
+  }
 }
 



Mime
View raw message