mrunit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbe...@apache.org
Subject svn commit: r1369483 - in /mrunit/trunk/src: main/java/org/apache/hadoop/mrunit/ test/java/org/apache/hadoop/mrunit/ test/java/org/apache/hadoop/mrunit/mapreduce/ test/java/org/apache/hadoop/mrunit/types/
Date Sat, 04 Aug 2012 22:02:14 GMT
Author: dbeech
Date: Sat Aug  4 22:02:14 2012
New Revision: 1369483

URL: http://svn.apache.org/viewvc?rev=1369483&view=rev
Log:
MRUNIT-131: Comparators registered using WritableComparator.define are not used during tests

Added:
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/types/TestWritable.java
Modified:
    mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
    mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java

Modified: mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java?rev=1369483&r1=1369482&r2=1369483&view=diff
==============================================================================
--- mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java (original)
+++ mrunit/trunk/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java Sat Aug 
4 22:02:14 2012
@@ -29,6 +29,8 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -165,29 +167,40 @@ public abstract class MapReduceDriverBas
    */
   public List<Pair<K2, List<V2>>> shuffle(final List<Pair<K2, V2>>
mapOutputs) {
     
-    // sort the map outputs using the key order comparator (if set)
-    if (keyValueOrderComparator != null) {
-      final Comparator<Pair<K2, V2>> pairKeyComparator = new Comparator<Pair<K2,
V2>>() {
-        @Override
-        public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
-          return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
-        }
-      };
-      Collections.sort(mapOutputs, pairKeyComparator);
+    final Comparator<K2> keyOrderComparator;
+    final Comparator<K2> keyGroupComparator;
+    
+    if (mapOutputs.isEmpty()) {
+      return Collections.emptyList();
     }
-    else {
-      Collections.sort(mapOutputs, new Pair.FirstElemComparator());
+
+    // JobConf needs the map output key class to work out the
+    // comparator to use
+    JobConf conf = new JobConf(getConfiguration());
+    K2 firstKey = mapOutputs.get(0).getFirst();
+    conf.setMapOutputKeyClass(firstKey.getClass());
+
+    // get the ordering comparator or work out from conf
+    if (keyValueOrderComparator == null) {
+      keyOrderComparator = conf.getOutputKeyComparator();
+    } else {
+      keyOrderComparator = this.keyValueOrderComparator;
     }
     
-    // initialise grouping comparator
-    final Comparator<K2> keyGroupComparator;
+    // get the grouping comparator or work out from conf
     if (this.keyGroupComparator == null) {
-      keyGroupComparator = new JobConf(getConfiguration())
-          .getOutputValueGroupingComparator();
+      keyGroupComparator = conf.getOutputValueGroupingComparator();
     } else {
       keyGroupComparator = this.keyGroupComparator;
     }
-    
+
+    // sort the map outputs according to their keys
+    Collections.sort(mapOutputs, new Comparator<Pair<K2, V2>>() {
+      public int compare(final Pair<K2, V2> o1, final Pair<K2, V2> o2) {
+        return keyOrderComparator.compare(o1.getFirst(), o2.getFirst());
+      }
+    });
+
     // apply grouping comparator to create groups
     final Map<K2, List<Pair<K2, V2>>> groupedByKey = 
         new LinkedHashMap<K2, List<Pair<K2, V2>>>();

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java?rev=1369483&r1=1369482&r2=1369483&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java Sat Aug 
4 22:02:14 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapred.lib.Iden
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.types.TestWritable;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -519,6 +521,7 @@ public class TestMapReduceDriver {
         .newMapReduceDriver(new IdentityMapper<Integer, IntWritable>(),
             new IdentityReducer<Integer, IntWritable>())
         .withConfiguration(conf);
+    driver.withKeyOrderComparator(new JavaSerializationComparator<Integer>());
     driver.withKeyGroupingComparator(INTEGER_COMPARATOR);
     driver.withInput(1, new IntWritable(2)).withInput(2, new IntWritable(3));
     driver.withOutput(1, new IntWritable(2)).withOutput(2, new IntWritable(3));
@@ -629,4 +632,34 @@ public class TestMapReduceDriver {
       .runTest(false);
   }
 
+  @Test
+  public void testGroupingComparatorSpecifiedByConf() throws IOException {
+    JobConf conf = new JobConf(new Configuration());
+    conf.setOutputValueGroupingComparator(FirstCharComparator.class);
+    driver.withInput(new Text("A1"),new LongWritable(1L))
+      .withInput(new Text("A2"),new LongWritable(1L))
+      .withInput(new Text("B1"),new LongWritable(1L))
+      .withInput(new Text("B2"),new LongWritable(1L))
+      .withInput(new Text("C1"),new LongWritable(1L))
+      .withOutput(new Text("A1"),new LongWritable(2L))
+      .withOutput(new Text("B1"),new LongWritable(2L))
+      .withOutput(new Text("C1"),new LongWritable(1L))
+      .withConfiguration(conf)
+      .runTest(false);
+  }
+
+  @Test
+  public void testUseOfWritableRegisteredComparator() throws IOException {
+    MapReduceDriver<TestWritable,Text,TestWritable,Text,TestWritable,Text> driver 
+      = MapReduceDriver.newMapReduceDriver(new IdentityMapper(), new IdentityReducer());
+    driver.withInput(new TestWritable("A1"), new Text("A1"))
+      .withInput(new TestWritable("A2"), new Text("A2"))
+      .withInput(new TestWritable("A3"), new Text("A3"))
+      .withKeyGroupingComparator(new TestWritable.SingleGroupComparator())
+      .withOutput(new TestWritable("A3"), new Text("A3"))
+      .withOutput(new TestWritable("A3"), new Text("A2"))
+      .withOutput(new TestWritable("A3"), new Text("A1"))
+      .runTest(true); //ordering is important
+  }
+
 }

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java?rev=1369483&r1=1369482&r2=1369483&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java (original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java Sat
Aug  4 22:02:14 2012
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
@@ -268,6 +269,7 @@ public class TestPipelineMapReduceDriver
     final JobConf conf = new JobConf();
     conf.setStrings("io.serializations", conf.get("io.serializations"),
         "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
     conf.setOutputValueGroupingComparator(TestMapReduceDriver.INTEGER_COMPARATOR
         .getClass());
     final PipelineMapReduceDriver<Integer, IntWritable, Integer, IntWritable> driver
= PipelineMapReduceDriver

Modified: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java?rev=1369483&r1=1369482&r2=1369483&view=diff
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
(original)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
Sat Aug  4 22:02:14 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mrunit.TestMapR
 import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
 import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
 import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.types.TestWritable;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -465,6 +467,7 @@ public class TestMapReduceDriver {
             new IntSumReducer<Integer>()).withConfiguration(conf);
     driver
         .setKeyGroupingComparator(org.apache.hadoop.mrunit.TestMapReduceDriver.INTEGER_COMPARATOR);
+    driver.setKeyOrderComparator(new JavaSerializationComparator<Integer>());
     driver.withInput(new IntWritable(1), 2).withInput(new IntWritable(2), 3);
     driver.withOutput(2, new IntWritable(1)).withOutput(3, new IntWritable(2))
         .runTest();
@@ -555,4 +558,27 @@ public class TestMapReduceDriver {
       .runTest(false);
   }
 
+  @Test
+  public void testUseOfWritableRegisteredComparator() throws IOException {
+    
+    // this test should use the comparator registered inside TestWritable
+    // to output the keys in reverse order
+    MapReduceDriver<TestWritable,Text,TestWritable,Text,TestWritable,Text> driver 
+      = MapReduceDriver.newMapReduceDriver(new Mapper(), new Reducer());
+    
+    driver.withInput(new TestWritable("A1"), new Text("A1"))
+      .withInput(new TestWritable("A2"), new Text("A2"))
+      .withInput(new TestWritable("A3"), new Text("A3"))
+      .withKeyGroupingComparator(new TestWritable.SingleGroupComparator())
+      // TODO: these output keys are incorrect because of MRUNIT-129 
+      .withOutput(new TestWritable("A3"), new Text("A3"))
+      .withOutput(new TestWritable("A3"), new Text("A2"))
+      .withOutput(new TestWritable("A3"), new Text("A1"))
+      //the following are the actual correct outputs
+      //.withOutput(new TestWritable("A3"), new Text("A3"))
+      //.withOutput(new TestWritable("A2"), new Text("A2"))
+      //.withOutput(new TestWritable("A1"), new Text("A1"))
+      .runTest(true); //ordering is important
+  }
+
 }

Added: mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/types/TestWritable.java
URL: http://svn.apache.org/viewvc/mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/types/TestWritable.java?rev=1369483&view=auto
==============================================================================
--- mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/types/TestWritable.java (added)
+++ mrunit/trunk/src/test/java/org/apache/hadoop/mrunit/types/TestWritable.java Sat Aug  4
22:02:14 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+public class TestWritable implements WritableComparable<TestWritable> {
+
+  private Text value = new Text();
+
+  public TestWritable() {
+
+  }
+
+  public TestWritable(String value) {
+    this.value.set(value);
+  }
+
+  public static class Comparator extends WritableComparator {
+    public Comparator() {
+      super(TestWritable.class, true);
+    }
+
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      TestWritable a1 = (TestWritable) a;
+      TestWritable b1 = (TestWritable) b;
+      return -(a1.value.compareTo(b1.value));
+    }
+  }
+
+  public static class SingleGroupComparator extends WritableComparator {
+    public SingleGroupComparator() {
+      super(TestWritable.class, true);
+    }
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      // force all instances into the same group
+      return 0;
+    }
+  }
+
+  public String getValue() {
+    return value.toString();
+  }
+
+  public void setText(String text) {
+    this.value.set(text);
+  }
+
+  public void readFields(DataInput arg0) throws IOException {
+    value.readFields(arg0);
+  }
+
+  public void write(DataOutput arg0) throws IOException {
+    value.write(arg0);
+  }
+
+  public int compareTo(TestWritable o) {
+    return value.compareTo(o.value);
+  }
+
+  @Override
+  public String toString() {
+    return value.toString();
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#hashCode()
+   */
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#equals(java.lang.Object)
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TestWritable other = (TestWritable) obj;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+  static {
+    WritableComparator.define(TestWritable.class, new Comparator());
+  }
+
+}



Mime
View raw message