avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1349492 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapreduce/ lang/java/mapred/src/test/java/org/apache/avro/mapreduce/
Date Tue, 12 Jun 2012 19:46:29 GMT
Author: cutting
Date: Tue Jun 12 19:46:29 2012
New Revision: 1349492

URL: http://svn.apache.org/viewvc?rev=1349492&view=rev
Log:
AVRO-1106. Java: Add AvroMultipleOutputs for newer mapreduce API.  Contributed by Ashish Nagavaram.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
  (with props)
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
  (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1349492&r1=1349491&r2=1349492&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jun 12 19:46:29 2012
@@ -4,6 +4,9 @@ Avro 1.7.1 (unreleased)
 
   NEW FEATURES
 
+    AVRO-1106. Java: Add AvroMultipleOutputs for newer mapreduce API.
+    (Ashish Nagavaram via cutting)
+
   IMPROVEMENTS
 
   BUG FIXES

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1349492&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
(added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
Tue Jun 12 19:46:29 2012
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.List;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.avro.Schema;
+
+/**
+ * The AvroMultipleOutputs class simplifies writing Avro output data 
+ * to multiple outputs
+ * 
+ * <p> 
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>Schema</code> and <code>OutputFormat</code>.
+ * </p>
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ * 
+ * <p>
+ * AvroMultipleOutputs supports counters, by default they are disabled. The 
+ * counters group is the {@link AvroMultipleOutputs} class name. The names of the 
+ * counters are the same as the output name. These count the number of records 
+ * written to each output name. 
+ * </p>
+ * 
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MyAvroMapper.class);
+ * job.setReducerClass(MyAvroReducer.class);
+ * ...
+ *  
+ * Schema schema;
+ * ...
+ * // Defines additional single output 'avro1' for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroKeyValueOutputFormat.class,
+ * keyschema, valueSchema);  // valueSchema can be set to null if there only Key to be written

+                                   to file in the RecordWriter
+ *
+ * // Defines additional output 'avro2' with different schema for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro2",
+ *   AvroKeyOutputFormat.class,
+ *   schema,null); 
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ * 
+ * public class MyAvroReducer extends
+ *   Reducer&lt;K, V, T, NullWritable&gt; {
+ * private MultipleOutputs amos;
+ *
+ *
+ * public void setup(Context context) {
+ * ...
+ * amos = new AvroMultipleOutputs(context);
+ * }
+ *
+ * public void reduce(K, Iterator&lt;V&gt; values,Context context)
+ * throws IOException {
+ * ...
+ * amos.write("avro1",datum,NullWritable.get());
+ * amos.write("avro2",datum,NullWritable.get());
+ * amos.getCollector("avro3",datum); // here the value is taken as NullWritable
+ * ...
+ * }
+ *
+ * public void cleanup(Context context) throws IOException {
+ * amos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+
+
+public class AvroMultipleOutputs{
+
+  private static final String MULTIPLE_OUTPUTS = "avro.mapreduce.multipleoutputs";
+
+  private static final String MO_PREFIX = 
+    "avro.mapreduce.multipleoutputs.namedOutput.";
+
+  private static final String FORMAT = ".format";
+  private static final String COUNTERS_ENABLED = 
+    "avro.mapreduce.multipleoutputs.counters";
+
+  /**
+   * Counters group used by the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = AvroMultipleOutputs.class.getName();
+  
+  /**
+   * Cache for the taskContexts
+   */
+  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
+
+  /**
+   * Cache for the Key Schemas
+   */
+  private static Map<String, Schema> keySchemas = new HashMap<String, Schema>();
+
+  /**
+   * Cache for the Value Schemas
+   */
+  private static Map<String, Schema> valSchemas = new HashMap<String, Schema>();
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if output name is valid.
+   *
+   * name cannot be the name used for the default output
+   * @param outputPath base output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkBaseOutputPath(String outputPath) {
+    if (outputPath.equals("part")) {
+      throw new IllegalArgumentException("output name cannot be 'part'");
+    }
+  }
+  
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(JobContext job,
+      String namedOutput, boolean alreadyDefined) {
+    checkTokenName(namedOutput);
+    checkBaseOutputPath(namedOutput);
+    List<String> definedChannels = getNamedOutputsList(job);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  // Returns list of channel names.
+  private static List<String> getNamedOutputsList(JobContext job) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(
+      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+  // Returns the named output OutputFormat.
+  @SuppressWarnings("unchecked")
+  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
+    JobContext job, String namedOutput) {
+    return (Class<? extends OutputFormat<?, ?>>)
+      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+ /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param job               job to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keySchema          Schema for the Key
+   */
+  @SuppressWarnings("unchecked")
+  public static void addNamedOutput(Job job, String namedOutput,
+      Class<? extends OutputFormat> outputFormatClass,
+      Schema keySchema) {
+      addNamedOutput(job,namedOutput,outputFormatClass,keySchema,null);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param job               job to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keySchema          Schema for the Key
+   * @param valueSchema        Schema for the Value (used in case of AvroKeyValueOutputFormat
or null)
+   */
+  @SuppressWarnings("unchecked")
+  public static void addNamedOutput(Job job, String namedOutput,
+      Class<? extends OutputFormat> outputFormatClass,
+      Schema keySchema, Schema valueSchema) {
+    checkNamedOutputName(job, namedOutput, true);
+    Configuration conf = job.getConfiguration();
+    conf.set(MULTIPLE_OUTPUTS,
+      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
+    valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
+  
+  }
+
+  /**
+   * Enables or disables counters for the named outputs.
+   * 
+   * The counters group is the {@link MultipleOutputs} class name.
+   * The names of the counters are the same as the named outputs. These
+   * counters count the number records written to each output name.
+   * By default these counters are disabled.
+   *
+   * @param job    job  to enable counters
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(Job job, boolean enabled) {
+    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * By default these counters are disabled.
+   *
+   * @param job    the job 
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobContext job) {
+    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
+  }
+
+  /**
+   * Wraps RecordWriter to increment counters. 
+   */
+  @SuppressWarnings("unchecked")
+  private static class RecordWriterWithCounter extends RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private TaskInputOutputContext context;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   TaskInputOutputContext context) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) 
+        throws IOException, InterruptedException {
+      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
+      writer.write(key, value);
+    }
+
+    public void close(TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      writer.close(context);
+    }
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private TaskInputOutputContext<?, ?, ?, ?> context;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter<?, ?>> recordWriters;
+  private boolean countersEnabled;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public AvroMultipleOutputs(
+      TaskInputOutputContext<?, ?, ?, ?> context) {
+    this.context = context;
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(AvroMultipleOutputs.getNamedOutputsList(context)));
+    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
+    countersEnabled = getCountersEnabled(context);
+  }
+
+  /**
+   * Write key and value to the namedOutput.
+   *
+   * Output path is a unique file generated for the namedOutput.
+   * For example, {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput the named output name
+   * @param key         the key , value is NullWritable
+   */
+  @SuppressWarnings("unchecked")
+  public void write(String namedOutput, Object key)
+      throws IOException, InterruptedException {
+    write(namedOutput, key, NullWritable.get(), namedOutput);
+  }
+
+
+
+  /**
+   * Write key and value to the namedOutput.
+   *
+   * Output path is a unique file generated for the namedOutput.
+   * For example, {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput the named output name
+   * @param key         the key
+   * @param value       the value
+   */
+  @SuppressWarnings("unchecked")
+  public void write(String namedOutput, Object key, Object value)
+      throws IOException, InterruptedException {
+    write(namedOutput, key, value, namedOutput);
+  }
+
+  /**
+   * Write key and value to baseOutputPath using the namedOutput.
+   * 
+   * @param namedOutput    the named output name
+   * @param key            the key
+   * @param value          the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(String namedOutput, Object key, Object value,
+      String baseOutputPath) throws IOException, InterruptedException {
+    checkNamedOutputName(context, namedOutput, false);
+    checkBaseOutputPath(baseOutputPath);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  /**
+   * Write key value to an output file name.
+   * 
+   * Gets the record writer from job's output format.  
+   * Job's output format should be a FileOutputFormat.
+   * 
+   * @param key       the key
+   * @param value     the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(Object key, Object value, String baseOutputPath) 
+      throws IOException, InterruptedException {
+    checkBaseOutputPath(baseOutputPath);
+    TaskAttemptContext taskContext = new TaskAttemptContext(
+      context.getConfiguration(), context.getTaskAttemptID());
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreadedMapper.
+  @SuppressWarnings("unchecked")
+  private synchronized RecordWriter getRecordWriter(
+      TaskAttemptContext taskContext, String baseFileName) 
+      throws IOException, InterruptedException {
+    
+    // look for record-writer in the cache
+    RecordWriter writer = recordWriters.get(baseFileName);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      //FileOutputFormat.setOutputName(taskContext, baseFileName);
+      try {
+        writer = ((OutputFormat) ReflectionUtils.newInstance(
+          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+          .getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+ 
+      // if counters are enabled, wrap the writer with context 
+      // to increment counters 
+      if (countersEnabled) {
+        writer = new RecordWriterWithCounter(writer, baseFileName, context);
+      }
+      
+      // add the record-writer to the cache
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+   // Create a taskAttemptContext for the named output with 
+   // output format and output key/value types put in the context
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+
+    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
+
+    if (taskContext != null) {
+      return taskContext;
+    }
+
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
+    Job job = new Job(context.getConfiguration());
+    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
+    Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
+    Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
+
+    boolean isMaponly=job.getNumReduceTasks() == 0;
+
+    if(keySchema!=null)
+    {
+      if(isMaponly)
+        AvroJob.setMapOutputKeySchema(job,keySchema);
+      else
+        AvroJob.setOutputKeySchema(job,keySchema);
+    }
+    if(valSchema!=null)
+    {
+      if(isMaponly)
+        AvroJob.setMapOutputValueSchema(job,valSchema);
+      else
+        AvroJob.setOutputValueSchema(job,valSchema);
+    }
+    taskContext = new TaskAttemptContext(
+      job.getConfiguration(), context.getTaskAttemptID());
+    
+    taskContexts.put(nameOutput, taskContext);
+    
+    return taskContext;
+  }
+  
+  /**
+   * Closes all the opened outputs.
+   * 
+   * This should be called from cleanup method of map/reduce task.
+   * If overridden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   * 
+   */
+  @SuppressWarnings("unchecked")
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(context);
+    }
+  }
+}
+
+ 

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1349492&r1=1349491&r2=1349492&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
Tue Jun 12 19:46:29 2012
@@ -20,6 +20,7 @@ package org.apache.avro.mapreduce;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 import org.apache.avro.file.CodecFactory;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,10 @@ public abstract class AvroOutputFormatBa
    * @return The target output stream.
    */
   protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException
{
-    Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
+    Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
+      getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
     return path.getFileSystem(context.getConfiguration()).create(path);
   }
+ 
+   
 }

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java?rev=1349492&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
(added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
Tue Jun 12 19:46:29 2012
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.apache.avro.mapred.Pair;
+
+public class TestAvroMultipleOutputs {
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  public static final Schema STATS_SCHEMA =
+      Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+          + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
+          + "{\"name\":\"name\",\"type\":\"string\"}]}");
+  public static final Schema STATS_SCHEMA_2 = 
+      Schema.parse("{\"name\":\"stats\",\"type\":\"record\","
+          + "\"fields\":[{\"name\":\"count1\",\"type\":\"int\"},"
+          + "{\"name\":\"name1\",\"type\":\"string\"}]}");  
+
+  private static class LineCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
+    private IntWritable mOne;
+
+    @Override
+    protected void setup(Context context) {
+      mOne = new IntWritable(1);
+    }
+
+    @Override
+    protected void map(LongWritable fileByteOffset, Text line, Context context)
+        throws IOException, InterruptedException {
+      context.write(line, mOne);
+    }
+  }
+
+  private static class StatCountMapper
+      extends Mapper<AvroKey<TextStats>, NullWritable, Text, IntWritable> {
+    private IntWritable mCount;
+    private Text mText;
+
+    @Override
+    protected void setup(Context context) {
+      mCount = new IntWritable(0);
+      mText = new Text("");
+    }
+
+    @Override
+    protected void map(AvroKey<TextStats> record, NullWritable ignore, Context context)
+        throws IOException, InterruptedException {
+      mCount.set(record.datum().count);
+      mText.set(record.datum().name.toString());
+      context.write(mText, mCount);
+    }
+  }
+
+  private static class GenericStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>, NullWritable>
{
+    private AvroKey<GenericData.Record> mStats;
+    private AvroMultipleOutputs amos;
+
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<GenericData.Record>(null);
+      amos = new AvroMultipleOutputs(context);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+        throws IOException, InterruptedException {
+      GenericData.Record record = new GenericData.Record(STATS_SCHEMA);
+      GenericData.Record record2 = new GenericData.Record(STATS_SCHEMA_2);
+      int sum = 0;
+      for (IntWritable count : counts) {
+        sum += count.get();
+      }
+      record.put("name", new Utf8(line.toString()));
+      record.put("count", new Integer(sum));
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get()); 
+      amos.write("myavro",mStats,NullWritable.get());
+      record2.put("name1", new Utf8(line.toString()));
+      record2.put("count1", new Integer(sum));
+      mStats.datum(record2); 
+      amos.write("myavro1",mStats);
+    }
+   
+    @Override
+    protected void cleanup(Context context) throws IOException,InterruptedException
+    {
+      amos.close();
+    }
+  }
+
+  private static class SpecificStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<TextStats>, NullWritable> {
+    private AvroKey<TextStats> mStats;
+    private AvroMultipleOutputs amos;
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<TextStats>(null);
+      amos = new AvroMultipleOutputs(context);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+        throws IOException, InterruptedException {
+      TextStats record = new TextStats();
+      record.count = 0;
+      for (IntWritable count : counts) {
+        record.count += count.get();
+      }
+      record.name = line.toString();
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get());
+      amos.write("myavro3",mStats,NullWritable.get());
+    }
+    @Override
+    protected void cleanup(Context context) throws IOException,InterruptedException
+    {
+      amos.close();
+    }
+  }
+
+  private static class SortMapper
+      extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+    @Override
+    protected void map(AvroKey<TextStats> key, NullWritable value, Context context)
+        throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  private static class SortReducer
+      extends Reducer<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>,
NullWritable> {
+    @Override
+    protected void reduce(AvroKey<TextStats> key, Iterable<NullWritable> ignore,
Context context)
+        throws IOException, InterruptedException {
+      context.write(key, NullWritable.get());
+    }
+  }
+
+  @Test
+  public void testAvroGenericOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(GenericStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, STATS_SCHEMA);    
+    AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroKeyOutputFormat.class,STATS_SCHEMA,null);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro1", AvroKeyOutputFormat.class, STATS_SCHEMA_2);

+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    outputPath.getFileSystem(job.getConfiguration()).delete(outputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<GenericData.Record> reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro1-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name1")).toString(), (Integer) record.get("count1"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
+  public void testAvroSpecificOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+    job.setReducerClass(SpecificStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out-specific");
+    outputPath.getFileSystem(job.getConfiguration()).delete(outputPath); 
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
+  public void testAvroInput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro3",AvroKeyOutputFormat.class,TextStats.SCHEMA$,null);
+
+    job.setMapperClass(StatCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(SpecificStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-specific-input");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/myavro3-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
+  public void testAvroMapOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setMapperClass(SortMapper.class);
+    AvroJob.setMapOutputKeySchema(job, TextStats.SCHEMA$);
+    job.setMapOutputValueClass(NullWritable.class);
+
+    job.setReducerClass(SortReducer.class);
+    AvroJob.setOutputKeySchema(job, TextStats.SCHEMA$);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-specific-input");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<TextStats> reader = new DataFileReader<TextStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new SpecificDatumReader<TextStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (TextStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message