hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1076108 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/hadoop/mapred/lib/ src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
Date Wed, 02 Mar 2011 05:31:22 GMT
Author: tomwhite
Date: Wed Mar  2 05:31:22 2011
New Revision: 1076108

URL: http://svn.apache.org/viewvc?rev=1076108&view=rev
Log:
MAPREDUCE-2225. MultipleOutputs should not require the use of 'Writable'. Contributed by Harsh
J Chouraria.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1076108&r1=1076107&r2=1076108&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Mar  2 05:31:22 2011
@@ -40,6 +40,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-2206. The task-cleanup tasks should be optional. (schen)
 
+    MAPREDUCE-2225. MultipleOutputs should not require the use of 'Writable'.
+    (Harsh J Chouraria via tomwhite)
+
   OPTIMIZATIONS
     
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java?rev=1076108&r1=1076107&r2=1076108&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java Wed
Mar  2 05:31:22 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.mapred.lib;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.Progressable;
 
@@ -253,11 +251,11 @@ public class MultipleOutputs {
    * @param namedOutput named output
    * @return class for the named output key
    */
-  public static Class<? extends WritableComparable> getNamedOutputKeyClass(JobConf
conf,
+  public static Class<?> getNamedOutputKeyClass(JobConf conf,
                                                 String namedOutput) {
     checkNamedOutput(conf, namedOutput, false);
     return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
-	WritableComparable.class);
+	Object.class);
   }
 
   /**
@@ -267,11 +265,11 @@ public class MultipleOutputs {
    * @param namedOutput named output
    * @return class of named output value
    */
-  public static Class<? extends Writable> getNamedOutputValueClass(JobConf conf,
+  public static Class<?> getNamedOutputValueClass(JobConf conf,
                                                   String namedOutput) {
     checkNamedOutput(conf, namedOutput, false);
     return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
-      Writable.class);
+      Object.class);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=1076108&r1=1076107&r2=1076108&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
Wed Mar  2 05:31:22 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.mapreduce.lib.
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -218,14 +216,14 @@ public class MultipleOutputs<KEYOUT, VAL
   private static Class<?> getNamedOutputKeyClass(JobContext job,
                                                 String namedOutput) {
     return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
-      WritableComparable.class);
+      Object.class);
   }
 
   // Returns the value class for a named output.
-  private static Class<? extends Writable> getNamedOutputValueClass(
+  private static Class<?> getNamedOutputValueClass(
       JobContext job, String namedOutput) {
     return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
-      null, Writable.class);
+      null, Object.class);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java?rev=1076108&r1=1076107&r2=1076108&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
Wed Mar  2 05:31:22 2011
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 import org.apache.hadoop.mapred.*;
 
 import java.io.BufferedReader;
@@ -39,10 +40,12 @@ public class TestMultipleOutputs extends
 
   public void testWithoutCounters() throws Exception {
     _testMultipleOutputs(false);
+    _testMOWithJavaSerialization(false);
   }
 
   public void testWithCounters() throws Exception {
     _testMultipleOutputs(true);
+    _testMOWithJavaSerialization(true);
   }
 
   private static final Path ROOT_DIR = new Path("testing/mo");
@@ -80,6 +83,94 @@ public class TestMultipleOutputs extends
     fs.delete(rootDir, true);
     super.tearDown();
   }
+  
+  protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
+    Path inDir = getDir(IN_DIR);
+    Path outDir = getDir(OUT_DIR);
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    fs.delete(inDir, true);
+    fs.delete(outDir, true);
+
+    file = fs.create(new Path(inDir, "part-1"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    conf.setJobName("mo");
+
+    conf.set("io.serializations",
+    "org.apache.hadoop.io.serializer.JavaSerialization," +
+    "org.apache.hadoop.io.serializer.WritableSerialization");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(Long.class);
+    conf.setMapOutputValueClass(String.class);
+    conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
+
+    conf.setOutputKeyClass(Long.class);
+    conf.setOutputValueClass(String.class);
+    
+    conf.setOutputFormat(TextOutputFormat.class);
+
+    MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
+      Long.class, String.class);
+
+    MultipleOutputs.setCountersEnabled(conf, withCounters);
+
+    conf.setMapperClass(MOJavaSerDeMap.class);
+    conf.setReducerClass(MOJavaSerDeReduce.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    JobClient jc = new JobClient(conf);
+    RunningJob job = jc.submitJob(conf);
+    while (!job.isComplete()) {
+      Thread.sleep(100);
+    }
+
+    // assert number of named output part files
+    int namedOutputCount = 0;
+    FileStatus[] statuses = fs.listStatus(outDir);
+    for (FileStatus status : statuses) {
+      if (status.getPath().getName().equals("text-m-00000") ||
+        status.getPath().getName().equals("text-r-00000")) {
+        namedOutputCount++;
+      }
+    }
+    assertEquals(2, namedOutputCount);
+
+    // assert TextOutputFormat files correctness
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(fs.open(
+        new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
+    int count = 0;
+    String line = reader.readLine();
+    while (line != null) {
+      assertTrue(line.endsWith("text"));
+      line = reader.readLine();
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+
+    Counters.Group counters =
+      job.getCounters().getGroup(MultipleOutputs.class.getName());
+    if (!withCounters) {
+      assertEquals(0, counters.size());
+    }
+    else {
+      assertEquals(1, counters.size());
+      assertEquals(2, counters.getCounter("text"));
+    }
+  }
 
   protected void _testMultipleOutputs(boolean withCounters) throws Exception {
     Path inDir = getDir(IN_DIR);
@@ -106,8 +197,6 @@ public class TestMultipleOutputs extends
     conf.setMapOutputValueClass(Text.class);
 
     conf.setOutputFormat(TextOutputFormat.class);
-    conf.setOutputKeyClass(LongWritable.class);
-    conf.setOutputValueClass(Text.class);
 
     MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
       LongWritable.class, Text.class);
@@ -256,5 +345,61 @@ public class TestMultipleOutputs extends
       mos.close();
     }
   }
+  
+  @SuppressWarnings({"unchecked"})
+  public static class MOJavaSerDeMap implements Mapper<LongWritable, Text, Long,
+    String> {
+
+    private MultipleOutputs mos;
+
+    public void configure(JobConf conf) {
+      mos = new MultipleOutputs(conf);
+    }
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<Long, String> output,
+                    Reporter reporter)
+      throws IOException {
+      if (!value.toString().equals("a")) {
+        output.collect(key.get(), value.toString());
+      } else {
+        mos.getCollector("text", reporter).collect(key, "text");
+      }
+    }
+
+    public void close() throws IOException {
+      mos.close();
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOJavaSerDeReduce implements Reducer<Long, String,
+    Long, String> {
+
+    private MultipleOutputs mos;
+
+    public void configure(JobConf conf) {
+      mos = new MultipleOutputs(conf);
+    }
+
+    public void reduce(Long key, Iterator<String> values,
+                       OutputCollector<Long, String> output,
+                       Reporter reporter)
+      throws IOException {
+      while (values.hasNext()) {
+        String value = values.next();
+        if (!value.equals("b")) {
+          output.collect(key, value);
+        } else {
+          mos.getCollector("text", reporter).collect(key, "text");
+        }
+      }
+    }
+
+    public void close() throws IOException {
+      mos.close();
+    }
+  }
+
 
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java?rev=1076108&r1=1076107&r2=1076108&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
Wed Mar  2 05:31:22 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 import org.apache.hadoop.mapred.HadoopTestCase;
 import org.apache.hadoop.mapreduce.*;
 
@@ -40,10 +41,12 @@ public class TestMRMultipleOutputs exten
 
   public void testWithoutCounters() throws Exception {
     _testMultipleOutputs(false);
+    _testMOWithJavaSerialization(false);
   }
 
   public void testWithCounters() throws Exception {
     _testMultipleOutputs(true);
+    _testMOWithJavaSerialization(true);
   }
 
   private static String localPathRoot = 
@@ -67,6 +70,84 @@ public class TestMRMultipleOutputs exten
     fs.delete(ROOT_DIR, true);
     super.tearDown();
   }
+  
+  protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
+    String input = "a\nb\nc\nd\ne\nc\nd\ne";
+
+    Configuration conf = createJobConf();
+    conf.set("io.serializations",
+    	    "org.apache.hadoop.io.serializer.JavaSerialization," +
+    	    "org.apache.hadoop.io.serializer.WritableSerialization");
+
+    Job job = MapReduceTestUtil.createJob(conf, IN_DIR, OUT_DIR, 2, 1, input);
+
+    job.setJobName("mo");
+    MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
+      Long.class, String.class);
+
+    MultipleOutputs.setCountersEnabled(job, withCounters);
+
+    job.setSortComparatorClass(JavaSerializationComparator.class);
+    
+    job.setMapOutputKeyClass(Long.class);
+    job.setMapOutputValueClass(String.class);
+
+    job.setOutputKeyClass(Long.class);
+    job.setOutputValueClass(String.class);
+
+    job.setMapperClass(MOJavaSerDeMap.class);
+    job.setReducerClass(MOJavaSerDeReduce.class);
+
+    job.waitForCompletion(true);
+
+    // assert number of named output part files
+    int namedOutputCount = 0;
+    int valueBasedOutputCount = 0;
+    FileSystem fs = OUT_DIR.getFileSystem(conf);
+    FileStatus[] statuses = fs.listStatus(OUT_DIR);
+    for (FileStatus status : statuses) {
+      String fileName = status.getPath().getName();
+      if (fileName.equals("text-m-00000") ||
+          fileName.equals("text-m-00001") ||
+          fileName.equals("text-r-00000")) {
+        namedOutputCount++;
+      } else if (fileName.equals("a-r-00000") ||
+          fileName.equals("b-r-00000") ||
+          fileName.equals("c-r-00000") ||
+          fileName.equals("d-r-00000") ||
+          fileName.equals("e-r-00000")) {
+        valueBasedOutputCount++;
+      }
+    }
+    assertEquals(3, namedOutputCount);
+    assertEquals(5, valueBasedOutputCount);
+
+    // assert TextOutputFormat files correctness
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(fs.open(
+        new Path(FileOutputFormat.getOutputPath(job), "text-r-00000"))));
+    int count = 0;
+    String line = reader.readLine();
+    while (line != null) {
+      assertTrue(line.endsWith(TEXT));
+      line = reader.readLine();
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+
+    if (withCounters) {
+      CounterGroup counters =
+        job.getCounters().getGroup(MultipleOutputs.class.getName());
+      assertEquals(6, counters.size());
+      assertEquals(4, counters.findCounter(TEXT).getValue());
+      assertEquals(2, counters.findCounter("a").getValue());
+      assertEquals(2, counters.findCounter("b").getValue());
+      assertEquals(4, counters.findCounter("c").getValue());
+      assertEquals(4, counters.findCounter("d").getValue());
+      assertEquals(4, counters.findCounter("e").getValue());
+    }
+  }
 
   protected void _testMultipleOutputs(boolean withCounters) throws Exception {
     String input = "a\nb\nc\nd\ne\nc\nd\ne";
@@ -163,7 +244,7 @@ public class TestMRMultipleOutputs exten
     }
   }
 
-  @SuppressWarnings({"unchecked"})
+  @SuppressWarnings("unchecked")
   public static class MOMap extends Mapper<LongWritable, Text, LongWritable,
     Text> {
 
@@ -191,7 +272,7 @@ public class TestMRMultipleOutputs exten
     }
   }
 
-  @SuppressWarnings({"unchecked"})
+  @SuppressWarnings("unchecked")
   public static class MOReduce extends Reducer<LongWritable, Text,
     LongWritable, Text> {
 
@@ -222,4 +303,54 @@ public class TestMRMultipleOutputs exten
       mos.close();
     }
   }
+
+  public static class MOJavaSerDeMap extends Mapper<LongWritable, Text, Long,
+    String> {
+
+    private MultipleOutputs<Long, String> mos;
+
+    public void setup(Context context) {
+      mos = new MultipleOutputs<Long, String>(context);
+    }
+
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      context.write(key.get(), value.toString());
+      if (value.toString().equals("a")) {
+        mos.write(TEXT, key.get(), TEXT);
+      }
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
+
+  public static class MOJavaSerDeReduce extends Reducer<Long, String,
+    Long, String> {
+
+    private MultipleOutputs<Long, String> mos;
+    
+    public void setup(Context context) {
+      mos = new MultipleOutputs<Long, String>(context);
+   }
+
+    public void reduce(Long key, Iterable<String> values, 
+        Context context) throws IOException, InterruptedException {
+      for (String value : values) {
+        mos.write(key, value, value.toString());
+        if (!value.toString().equals("b")) {
+          context.write(key, value);
+        } else {
+          mos.write(TEXT, key, new Text(TEXT));
+        }
+      }
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
 }



Mime
View raw message