hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r807908 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
Date Wed, 26 Aug 2009 07:32:25 GMT
Author: sharad
Date: Wed Aug 26 07:32:24 2009
New Revision: 807908

URL: http://svn.apache.org/viewvc?rev=807908&view=rev
Log:
MAPREDUCE-370. Change org.apache.hadoop.mapred.lib.MultipleOutputs and org.apache.hadoop.mapred.lib.MultipleOutputFormat
to use new api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Aug 26 07:32:24 2009
@@ -259,6 +259,10 @@
     MAPREDUCE-476. Extend DistributedCache to work locally (LocalJobRunner).
     (Philip Zeyliger via tomwhite)
 
+    MAPREDUCE-370. Change org.apache.hadoop.mapred.lib.MultipleOutputs
+    and org.apache.hadoop.mapred.lib.MultipleOutputFormat to use new api.
+    (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -47,7 +47,10 @@
  * Case three: This class is used for a map only job. The job wants to use an
  * output file name that depends on both the keys and the input file name,
  * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public abstract class MultipleOutputFormat<K, V>
 extends FileOutputFormat<K, V> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java Wed
Aug 26 07:32:24 2009
@@ -112,7 +112,10 @@
  *
  * }
  * </pre>
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleOutputs {
 
   private static final String NAMED_OUTPUTS = "mo.namedOutputs";

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -29,7 +29,10 @@
 /**
  * This class extends the MultipleOutputFormat, allowing to write the output data 
  * to different output files in sequence file output format. 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleSequenceFileOutputFormat <K,V>
 extends MultipleOutputFormat<K, V> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -29,7 +29,10 @@
 /**
  * This class extends the MultipleOutputFormat, allowing to write the output
  * data to different output files in Text output format.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleTextOutputFormat<K, V>
     extends MultipleOutputFormat<K, V> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -42,6 +42,7 @@
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  protected static final String PART = "part";
   static {
     NUMBER_FORMAT.setMinimumIntegerDigits(5);
     NUMBER_FORMAT.setGroupingUsed(false);
@@ -108,10 +109,14 @@
     return codecClass;
   }
   
-  public abstract RecordWriter<K, V> 
-     getRecordWriter(TaskAttemptContext job
-                     ) throws IOException, InterruptedException;
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job
+                     ) throws IOException, InterruptedException {
+    return getRecordWriter(job, PART);
+  }
 
+  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext job,
+      String name) throws IOException, InterruptedException;
+ 
   public void checkOutputSpecs(JobContext job
                                ) throws FileAlreadyExistsException, IOException{
     // Ensure that the output directory is set and not already there
@@ -250,10 +255,11 @@
    * @throws IOException
    */
   public Path getDefaultWorkFile(TaskAttemptContext context,
+                                 String name,
                                  String extension) throws IOException{
     FileOutputCommitter committer = 
       (FileOutputCommitter) getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getUniqueFile(context, "part", 
+    return new Path(committer.getWorkPath(), getUniqueFile(context, name, 
                                                            extension));
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -45,7 +45,7 @@
     extends FileOutputFormat<WritableComparable<?>, Writable> {
 
   public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
-      TaskAttemptContext context) throws IOException {
+      TaskAttemptContext context, String name) throws IOException {
     Configuration conf = context.getConfiguration();
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
@@ -59,7 +59,7 @@
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
     }
 
-    Path file = getDefaultWorkFile(context, "");
+    Path file = getDefaultWorkFile(context, name, "");
     FileSystem fs = file.getFileSystem(conf);
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=807908&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
Wed Aug 26 07:32:24 2009
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The MultipleOutputs class simplifies writing output data 
+ * to multiple outputs
+ * 
+ * <p> 
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>OutputFormat</code>, with its own key class and with its own value
+ * class.
+ * 
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ * 
+ * <p>
+ * MultipleOutputs supports counters, by default they are disabled. The 
+ * counters group is the {@link MultipleOutputs} class name. The names of the 
+ * counters are the same as the output name. These count the number records 
+ * written to each output name.
+ * </p>
+ * 
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MOMap.class);
+ * job.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * MultipleOutputs.addNamedOutput(job, "seq",
+ *   SequenceFileOutputFormat.class,
+ *   LongWritable.class, Text.class);
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ * <K, V> String generateFileName(K k, V v) {
+ *   return k.toString() + "_" + v.toString();
+ * }
+ * 
+ * public class MOReduce extends
+ *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
+ * private MultipleOutputs mos;
+ * public void setup(Context context) {
+ * ...
+ * mos = new MultipleOutputs(context);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
+ * Context context)
+ * throws IOException {
+ * ...
+ * mos.write("text", , key, new Text("Hello"));
+ * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
+ * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
+ * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
+ * ...
+ * }
+ *
+ * public void cleanup(Context) throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+public class MultipleOutputs<KEYOUT, VALUEOUT> {
+
+  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
+
+  private static final String MO_PREFIX = 
+    "mapreduce.multipleoutputs.namedOutput.";
+
+  private static final String FORMAT = ".format";
+  private static final String KEY = ".key";
+  private static final String VALUE = ".value";
+  private static final String COUNTERS_ENABLED = 
+    "mapreduce.multipleoutputs.counters";
+
+  /**
+   * Counters group used by the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if output name is valid.
+   *
+   * name cannot be the name used for the default output
+   * @param outputPath base output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkBaseOutputPath(String outputPath) {
+    if (outputPath.equals(FileOutputFormat.PART)) {
+      throw new IllegalArgumentException("output name cannot be 'part'");
+    }
+  }
+  
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(JobContext job,
+      String namedOutput, boolean alreadyDefined) {
+    checkTokenName(namedOutput);
+    checkBaseOutputPath(namedOutput);
+    List<String> definedChannels = getNamedOutputsList(job);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  // Returns list of channel names.
+  private static List<String> getNamedOutputsList(JobContext job) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(
+      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+  // Returns the named output OutputFormat.
+  @SuppressWarnings("unchecked")
+  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
+    JobContext job, String namedOutput) {
+    return (Class<? extends OutputFormat<?, ?>>)
+      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+  // Returns the key class for a named output.
+  private static Class<?> getNamedOutputKeyClass(JobContext job,
+                                                String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
+      WritableComparable.class);
+  }
+
+  // Returns the value class for a named output.
+  private static Class<? extends Writable> getNamedOutputValueClass(
+      JobContext job, String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
+      null, Writable.class);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param job               job to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  @SuppressWarnings("unchecked")
+  public static void addNamedOutput(Job job, String namedOutput,
+      Class<? extends FileOutputFormat> outputFormatClass,
+      Class<?> keyClass, Class<?> valueClass) {
+    checkNamedOutputName(job, namedOutput, true);
+    Configuration conf = job.getConfiguration();
+    conf.set(MULTIPLE_OUTPUTS,
+      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+  }
+
+  /**
+   * Enables or disables counters for the named outputs.
+   * 
+   * The counters group is the {@link MultipleOutputs} class name.
+   * The names of the counters are the same as the named outputs. These
+   * counters count the number records written to each output name.
+   * By default these counters are disabled.
+   *
+   * @param job    job  to enable counters
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(Job job, boolean enabled) {
+    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * By default these counters are disabled.
+   *
+   * @param job    the job 
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobContext job) {
+    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
+  }
+
+  /**
+   * Wraps RecordWriter to increment counters. 
+   */
+  @SuppressWarnings("unchecked")
+  private static class RecordWriterWithCounter extends RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private TaskInputOutputContext context;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   TaskInputOutputContext context) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) 
+        throws IOException, InterruptedException {
+      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
+      writer.write(key, value);
+    }
+
+    public void close(TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      writer.close(context);
+    }
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter<?, ?>> recordWriters;
+  private boolean countersEnabled;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public MultipleOutputs(
+      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+    this.context = context;
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
+    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
+    countersEnabled = getCountersEnabled(context);
+  }
+
+  /**
+   * Write key and value to the namedOutput.
+   *
+   * Output path is a unique file generated for the namedOutput.
+   * For example, {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput the named output name
+   * @param key         the key
+   * @param value       the value
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value)
+      throws IOException, InterruptedException {
+    write(namedOutput, key, value, namedOutput);
+  }
+
+  /**
+   * Write key and value to baseOutputPath using the namedOutput.
+   * 
+   * @param namedOutput    the named output name
+   * @param key            the key
+   * @param value          the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value,
+      String baseOutputPath) throws IOException, InterruptedException {
+    checkNamedOutputName(context, namedOutput, false);
+    checkBaseOutputPath(baseOutputPath);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  /**
+   * Write key value to an output file name.
+   * 
+   * Gets the record writer from job's output format.  
+   * Job's output format should be a FileOutputFormat.
+   * 
+   * @param key       the key
+   * @param value     the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
+      throws IOException, InterruptedException {
+    checkBaseOutputPath(baseOutputPath);
+    getRecordWriter(context, baseOutputPath).write(key, value);
+  }
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreadedMapper.
+  @SuppressWarnings("unchecked")
+  private synchronized RecordWriter getRecordWriter(
+      TaskAttemptContext taskContext, String baseFileName) 
+      throws IOException, InterruptedException {
+    
+    // look for record-writer in the cache
+    RecordWriter writer = recordWriters.get(baseFileName);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      try {
+        writer = ((FileOutputFormat) ReflectionUtils.newInstance(
+          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+          .getRecordWriter(taskContext, baseFileName);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+ 
+      // if counters are enabled, wrap the writer with context 
+      // to increment counters 
+      if (countersEnabled) {
+        writer = new RecordWriterWithCounter(writer, baseFileName, context);
+      }
+      
+      // add the record-writer to the cache
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+   // Create a taskAttemptContext for the named output with 
+   // output format and output key/value types put in the context
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    Job job = new Job(context.getConfiguration());
+    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
+    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
+    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
+    TaskAttemptContext taskContext = new TaskAttemptContext(
+      job.getConfiguration(), context.getTaskAttemptID());
+    return taskContext;
+  }
+  
+  /**
+   * Closes all the opened outputs.
+   * 
+   * This should be called from cleanup method of map/reduce task.
+   * If overridden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   * 
+   */
+  @SuppressWarnings("unchecked")
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(context);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -127,10 +127,10 @@
   
   @Override 
   public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException {
+      TaskAttemptContext context, String name) throws IOException {
     final SequenceFile.Writer out = getSequenceWriter(context,
       getSequenceFileOutputKeyClass(context),
-      getSequenceFileOutputValueClass(context)); 
+      getSequenceFileOutputValueClass(context), name); 
 
     return new RecordWriter<BytesWritable, BytesWritable>() {
       private WritableValueBytes wvaluebytes = new WritableValueBytes();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -39,7 +39,7 @@
 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
 
   protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
-      Class<?> keyClass, Class<?> valueClass) 
+      Class<?> keyClass, Class<?> valueClass, String name) 
       throws IOException {
     Configuration conf = context.getConfiguration();
 	    
@@ -55,7 +55,7 @@
         ReflectionUtils.newInstance(codecClass, conf);
     }
     // get the path of the temporary output file 
-    Path file = getDefaultWorkFile(context, "");
+    Path file = getDefaultWorkFile(context, name, "");
     FileSystem fs = file.getFileSystem(conf);
     return SequenceFile.createWriter(fs, conf, file,
              keyClass,
@@ -66,10 +66,10 @@
   }
   
   public RecordWriter<K, V> 
-         getRecordWriter(TaskAttemptContext context
+         getRecordWriter(TaskAttemptContext context, String name
                          ) throws IOException, InterruptedException {
     final SequenceFile.Writer out = getSequenceWriter(context,
-      context.getOutputKeyClass(), context.getOutputValueClass());
+      context.getOutputKeyClass(), context.getOutputValueClass(), name);
 
     return new RecordWriter<K, V>() {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=807908&r1=807907&r2=807908&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
Wed Aug 26 07:32:24 2009
@@ -108,7 +108,7 @@
   }
 
   public RecordWriter<K, V> 
-         getRecordWriter(TaskAttemptContext job
+         getRecordWriter(TaskAttemptContext job, String name
                          ) throws IOException, InterruptedException {
     Configuration conf = job.getConfiguration();
     boolean isCompressed = getCompressOutput(job);
@@ -122,7 +122,7 @@
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
       extension = codec.getDefaultExtension();
     }
-    Path file = getDefaultWorkFile(job, extension);
+    Path file = getDefaultWorkFile(job, name, extension);
     FileSystem fs = file.getFileSystem(conf);
     if (!isCompressed) {
       FSDataOutputStream fileOut = fs.create(file, false);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java?rev=807908&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
Wed Aug 26 07:32:24 2009
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class TestMRMultipleOutputs extends HadoopTestCase {
+
+  public TestMRMultipleOutputs() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testWithoutCounters() throws Exception {
+    _testMultipleOutputs(false);
+  }
+
+  public void testWithCounters() throws Exception {
+    _testMultipleOutputs(true);
+  }
+
+  private static String localPathRoot = 
+    System.getProperty("test.build.data", "/tmp");
+  private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");
+  private static final Path IN_DIR = new Path(ROOT_DIR, "input");
+  private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+  private static String TEXT = "text";
+  private static String SEQUENCE = "sequence";
+
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(ROOT_DIR, true);
+  }
+
+  public void tearDown() throws Exception {
+    Configuration conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(ROOT_DIR, true);
+    super.tearDown();
+  }
+
+  protected void _testMultipleOutputs(boolean withCounters) throws Exception {
+    String input = "a\nb\nc\nd\ne\nc\nd\ne";
+
+    Configuration conf = createJobConf();
+    Job job = MapReduceTestUtil.createJob(conf, IN_DIR, OUT_DIR, 2, 1, input);
+
+    job.setJobName("mo");
+    MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
+      LongWritable.class, Text.class);
+    MultipleOutputs.addNamedOutput(job, SEQUENCE,
+      SequenceFileOutputFormat.class, IntWritable.class, Text.class);
+
+    MultipleOutputs.setCountersEnabled(job, withCounters);
+
+    job.setMapperClass(MOMap.class);
+    job.setReducerClass(MOReduce.class);
+
+    job.waitForCompletion(true);
+
+    // assert number of named output part files
+    int namedOutputCount = 0;
+    int valueBasedOutputCount = 0;
+    FileSystem fs = OUT_DIR.getFileSystem(conf);
+    FileStatus[] statuses = fs.listStatus(OUT_DIR);
+    for (FileStatus status : statuses) {
+      String fileName = status.getPath().getName();
+      if (fileName.equals("text-m-00000") ||
+          fileName.equals("text-m-00001") ||
+          fileName.equals("text-r-00000") ||
+          fileName.equals("sequence_A-m-00000") ||
+          fileName.equals("sequence_A-m-00001") ||
+          fileName.equals("sequence_B-m-00000") ||
+          fileName.equals("sequence_B-m-00001") ||
+          fileName.equals("sequence_B-r-00000") ||
+          fileName.equals("sequence_C-r-00000")) {
+        namedOutputCount++;
+      } else if (fileName.equals("a-r-00000") ||
+          fileName.equals("b-r-00000") ||
+          fileName.equals("c-r-00000") ||
+          fileName.equals("d-r-00000") ||
+          fileName.equals("e-r-00000")) {
+        valueBasedOutputCount++;
+      }
+    }
+    assertEquals(9, namedOutputCount);
+    assertEquals(5, valueBasedOutputCount);
+
+    // assert TextOutputFormat files correctness
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(fs.open(
+        new Path(FileOutputFormat.getOutputPath(job), "text-r-00000"))));
+    int count = 0;
+    String line = reader.readLine();
+    while (line != null) {
+      assertTrue(line.endsWith(TEXT));
+      line = reader.readLine();
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+    
+    // assert SequenceOutputFormat files correctness
+    SequenceFile.Reader seqReader =
+      new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(job),
+        "sequence_B-r-00000"), conf);
+
+    assertEquals(IntWritable.class, seqReader.getKeyClass());
+    assertEquals(Text.class, seqReader.getValueClass());
+
+    count = 0;
+    IntWritable key = new IntWritable();
+    Text value = new Text();
+    while (seqReader.next(key, value)) {
+      assertEquals(SEQUENCE, value.toString());
+      count++;
+    }
+    seqReader.close();
+    assertFalse(count == 0);
+
+    if (withCounters) {
+      CounterGroup counters =
+        job.getCounters().getGroup(MultipleOutputs.class.getName());
+      assertEquals(9, counters.size());
+      assertEquals(4, counters.findCounter(TEXT).getValue());
+      assertEquals(2, counters.findCounter(SEQUENCE + "_A").getValue());
+      assertEquals(4, counters.findCounter(SEQUENCE + "_B").getValue());
+      assertEquals(2, counters.findCounter(SEQUENCE + "_C").getValue());
+      assertEquals(2, counters.findCounter("a").getValue());
+      assertEquals(2, counters.findCounter("b").getValue());
+      assertEquals(4, counters.findCounter("c").getValue());
+      assertEquals(4, counters.findCounter("d").getValue());
+      assertEquals(4, counters.findCounter("e").getValue());
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOMap extends Mapper<LongWritable, Text, LongWritable,
+    Text> {
+
+    private MultipleOutputs mos;
+
+    public void setup(Context context) {
+      mos = new MultipleOutputs(context);
+    }
+
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      context.write(key, value);
+      if (value.toString().equals("a")) {
+        mos.write(TEXT, key, new Text(TEXT));
+        mos.write(SEQUENCE, new IntWritable(1), new Text(SEQUENCE),
+          (SEQUENCE + "_A"));
+        mos.write(SEQUENCE, new IntWritable(2), new Text(SEQUENCE),
+          (SEQUENCE + "_B"));
+      }
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOReduce extends Reducer<LongWritable, Text,
+    LongWritable, Text> {
+
+    private MultipleOutputs mos;
+    
+    public void setup(Context context) {
+      mos = new MultipleOutputs(context);
+   }
+
+    public void reduce(LongWritable key, Iterable<Text> values, 
+        Context context) throws IOException, InterruptedException {
+      for (Text value : values) {
+        if (!value.toString().equals("b")) {
+          context.write(key, value);
+        } else {
+          mos.write(TEXT, key, new Text(TEXT));
+          mos.write(SEQUENCE, new IntWritable(2), new Text(SEQUENCE),
+            (SEQUENCE + "_B"));
+          mos.write(SEQUENCE, new IntWritable(3), new Text(SEQUENCE),
+            (SEQUENCE + "_C"));
+        }
+        mos.write(key, value, value.toString());
+      }
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
+}



Mime
View raw message