avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1341230 - in /avro/trunk: ./ lang/csharp/src/apache/test/Schema/ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Date Mon, 21 May 2012 22:14:16 GMT
Author: cutting
Date: Mon May 21 22:14:15 2012
New Revision: 1341230

URL: http://svn.apache.org/viewvc?rev=1341230&view=rev
Log:
AVRO-1052. Java: Add AvroMultipleOutputFormat, to permit splitting mapreduce output to multiple
locations.  Contributed by Ashish Nagavaram.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
  (with props)
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
  (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1341230&r1=1341229&r2=1341230&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon May 21 22:14:15 2012
@@ -16,6 +16,9 @@ Avro 1.7.0 (unreleased)
     AVRO-593. Java: Add support for Hadoop's newer mapreduce API.
     (Garrett Wu via cutting)
 
+    AVRO-1052. Java: Add AvroMultipleOutputFormat, to permit splitting
+    mapreduce output to multiple locations.  (Ashish Nagavaram via cutting)
+
   IMPROVEMENTS
 
     AVRO-1060. Java: Upgrade Netty to version 3.4.0.  (Karthik K via cutting)

Modified: avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs?rev=1341230&r1=1341229&r2=1341230&view=diff
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs (original)
+++ avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs Mon May 21 22:14:15
2012
@@ -48,7 +48,7 @@ namespace Avro.Test
             Assert.AreEqual(carefulFP, SchemaNormalization.ParsingFingerprint64(s));
         }
 
-        private static IEnumerable<object> ProvideFingerprintTestCases()
+        private static List<object[]> ProvideFingerprintTestCases()
         {
             using (StreamReader reader = new StreamReader("../../../../../share/test/data/schema-tests.txt"))
             {
@@ -56,7 +56,7 @@ namespace Avro.Test
             }
         }
 
-        private static IEnumerable<object> ProvideCanonicalTestCases()
+        private static List<object[]> ProvideCanonicalTestCases()
         {
             using (StreamReader reader = new StreamReader("../../../../../share/test/data/schema-tests.txt"))
             {

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java?rev=1341230&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
(added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
Mon May 21 22:14:15 2012
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.List;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collections;
+
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+
+
+
+/**
+ * The AvroMultipleOutputs class simplifies writing Avro output data 
+ * to multiple outputs
+ * 
+ * <p> 
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>Schema</code> and <code>OutputFormat</code>.
+ * A named output can be a single file or a multi file. The later is refered as
+ * a multi named output which is an unbound set of files all sharing the same
+ * <code>Schema</code>.
+ * </p>
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ * 
+ * <p>
+ * AvroMultipleOutputs supports counters, by default they are disabled. The 
+ * counters group is the {@link AvroMultipleOutputs} class name. The names of the 
+ * counters are the same as the output name. These count the number of records 
+ * written to each output name. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ * </p>
+ * 
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MyAvroMapper.class);
+ * job.setReducerClass(HadoopReducer.class);
+ * job.set("avro.reducer",MyAvroReducer.class);
+ * ...
+ *  
+ * Schema schema;
+ * ...
+ * // Defines additional single output 'avro1' for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroOutputFormat.class,
+ * schema);
+ *
+ * // Defines additional output 'avro2' with different schema for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro2",
+ *   AvroOutputFormat.class,
+ *   null); // if Schema is specified as null then the default output schema is used
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ * 
+ * public class MyAvroReducer extends
+ *   AvroReducer&lt;K, V, OUT&gt; {
+ * private MultipleOutputs amos;
+ *
+ *
+ * public void configure(JobConf conf) {
+ * ...
+ * amos = new AvroMultipleOutputs(conf);
+ * }
+ *
+ * public void reduce(K, Iterator&lt;V&gt; values,
+ * AvroCollector&lt;OUT&gt;, Reporter reporter)
+ * throws IOException {
+ * ...
+ * amos.getCollector("avro1", reporter).collect(datum);
+ * amos.getCollector("avro2", "A", reporter).collect(datum);
+ * amos.getCollector("avro3", "B", reporter).collect(datum);
+ * ...
+ * }
+ *
+ * public void close() throws IOException {
+ * amos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+
+public class AvroMultipleOutputs {
+
+  private static final String NAMED_OUTPUTS = "mo.namedOutputs";
+
+  private static final String MO_PREFIX = "mo.namedOutput.";
+
+  private static final String FORMAT = ".avro";
+  private static final String MULTI = ".multi";
+
+  private static final String COUNTERS_ENABLED = "mo.counters";
+ 
+ 
+  private static Map<String,Schema> schemaList = new HashMap<String,Schema>();
+  /**
+   * Counters group used by the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = AvroMultipleOutputs.class.getName();
+
+  /**
+   * Checks if a named output is alreadyDefined or not.
+   *
+   * @param conf           job conf
+   * @param namedOutput    named output names
+   * @param alreadyDefined whether the existence/non-existence of
+   *                       the named output is to be checked
+   * @throws IllegalArgumentException if the output name is alreadyDefined or
+   *                                  not depending on the value of the
+   *                                  'alreadyDefined' parameter
+   */
+  private static void checkNamedOutput(JobConf conf, String namedOutput,
+                                       boolean alreadyDefined) {
+    List<String> definedChannels = getNamedOutputsList(conf);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(String namedOutput) {
+    checkTokenName(namedOutput);
+    // name cannot be the name used for the default output
+    if (namedOutput.equals("part")) {
+      throw new IllegalArgumentException(
+        "Named output name cannot be 'part'");
+    }
+  }
+
+  /**
+   * Returns list of channel names.
+   *
+   * @param conf job conf
+   * @return List of channel Names
+   */
+  public static List<String> getNamedOutputsList(JobConf conf) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+
+  /**
+   * Returns if a named output is multiple.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return <code>true</code> if the name output is multi, <code>false</code>
+   *         if it is single. If the name output is not defined it returns
+   *         <code>false</code>
+   */
+  public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
+  }
+
+  /**
+   * Returns the named output OutputFormat.
+   *
+   * @param conf        job conf
+   * @param namedOutput named output
+   * @return namedOutput OutputFormat
+   */
+  public static Class<? extends OutputFormat> getNamedOutputFormatClass(
+    JobConf conf, String namedOutput) {
+    checkNamedOutput(conf, namedOutput, false);
+    return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param schema            Schema to used for this namedOutput
+   */
+  public static void addNamedOutput(JobConf conf, String namedOutput,
+                                Class<? extends OutputFormat> outputFormatClass,
+                                Schema schema) {
+    addNamedOutput(conf, namedOutput, false, outputFormatClass, schema);
+  }
+
+  /**
+   * Adds a multi named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param schema            Schema to used for this namedOutput
+   */
+  public static void addMultiNamedOutput(JobConf conf, String namedOutput,
+                               Class<? extends OutputFormat> outputFormatClass,
+                               Schema schema) {
+    addNamedOutput(conf, namedOutput, true, outputFormatClass, schema);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param conf              job conf to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the
+   *                          default output.
+   * @param multi             indicates if the named output is multi
+   * @param outputFormatClass OutputFormat class.
+   * @param schema            Schema to used for this namedOutput
+   */
+  private static void addNamedOutput(JobConf conf, String namedOutput,
+                               boolean multi,
+                               Class<? extends OutputFormat> outputFormatClass,
+                               Schema schema) {
+    checkNamedOutputName(namedOutput);
+    checkNamedOutput(conf, namedOutput, true);
+    boolean isMapOnly = conf.getNumReduceTasks() == 0;
+    schemaList.put(namedOutput+"_SCHEMA", schema);
+    conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
+  }
+
+  /**
+   * Enables or disables counters for the named outputs.
+   * <p/>
+   * By default these counters are disabled.
+   * <p/>
+   * MultipleOutputs supports counters, by default the are disabled.
+   * The counters group is the {@link MultipleOutputs} class name.
+   * </p>
+   * The names of the counters are the same as the named outputs. For multi
+   * named outputs the name of the counter is the concatenation of the named
+   * output, and underscore '_' and the multiname.
+   *
+   * @param conf    job conf to enableadd the named output.
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(JobConf conf, boolean enabled) {
+    conf.setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * <p/>
+   * By default these counters are disabled.
+   * <p/>
+   * MultipleOutputs supports counters, by default the are disabled.
+   * The counters group is the {@link MultipleOutputs} class name.
+   * </p>
+   * The names of the counters are the same as the named outputs. For multi
+   * named outputs the name of the counter is the concatenation of the named
+   * output, and underscore '_' and the multiname.
+   *
+   *
+   * @param conf    job conf to enableadd the named output.
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobConf conf) {
+    return conf.getBoolean(COUNTERS_ENABLED, false);
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private JobConf conf;
+  private OutputFormat outputFormat;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter> recordWriters;
+  private boolean countersEnabled;
+
+  /**
+   * Creates and initializes multiple named outputs support, it should be
+   * instantiated in the Mapper/Reducer configure method.
+   *
+   * @param job the job configuration object
+   */
+  public AvroMultipleOutputs(JobConf job) {
+    this.conf = job;
+    outputFormat = new InternalFileOutputFormat();
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(AvroMultipleOutputs.getNamedOutputsList(job)));
+    recordWriters = new HashMap<String, RecordWriter>();
+    countersEnabled = getCountersEnabled(job);
+  }
+
+  /**
+   * Returns iterator with the defined name outputs.
+   *
+   * @return iterator with the defined named outputs
+   */
+  public Iterator<String> getNamedOutputs() {
+    return namedOutputs.iterator();
+  }
+
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreaderMapRunner.
+  private synchronized RecordWriter getRecordWriter(String namedOutput,
+                                                    String baseFileName,
+                                                    final Reporter reporter)
+    throws IOException {
+    RecordWriter writer = recordWriters.get(baseFileName);
+    if (writer == null) {
+      if (countersEnabled && reporter == null) {
+        throw new IllegalArgumentException(
+          "Counters are enabled, Reporter cannot be NULL");
+      }
+      JobConf jobConf = new JobConf(conf);
+      jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
+      FileSystem fs = FileSystem.get(conf);
+      writer = outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+
+      if (countersEnabled) {
+        if (reporter == null) {
+          throw new IllegalArgumentException(
+            "Counters are enabled, Reporter cannot be NULL");
+        }
+        writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
+      }
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+  private static class RecordWriterWithCounter implements RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private Reporter reporter;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   Reporter reporter) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.reporter = reporter;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) throws IOException {
+      reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
+      writer.write(key, value);
+    }
+
+    public void close(Reporter reporter) throws IOException {
+      writer.close(reporter);
+    }
+  }
+
+  /**
+   * Gets the output collector for a named output.
+   * <p/>
+   *
+   * @param namedOutput the named output name
+   * @param reporter    the reporter
+   * @return the output collector for the given named output
+   * @throws IOException thrown if output collector could not be created
+   */
+  @SuppressWarnings({"unchecked"})
+  public AvroCollector getCollector(String namedOutput, Reporter reporter)
+    throws IOException {
+    return getCollector(namedOutput, null, reporter);
+  }
+
+  /**
+   * Gets the output collector for a multi named output.
+   * <p/>
+   *
+   * @param namedOutput the named output name
+   * @param multiName   the multi name part
+   * @param reporter    the reporter
+   * @return the output collector for the given named output
+   * @throws IOException thrown if output collector could not be created
+   */
+  @SuppressWarnings({"unchecked"})
+  public AvroCollector getCollector(String namedOutput, String multiName,
+                                      Reporter reporter)
+    throws IOException {
+
+    checkNamedOutputName(namedOutput);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    boolean multi = isMultiNamedOutput(conf, namedOutput);
+
+    if (!multi && multiName != null) {
+      throw new IllegalArgumentException("Name output '" + namedOutput +
+        "' has not been defined as multi");
+    }
+    if (multi) {
+      checkTokenName(multiName);
+    }
+
+    String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
+
+    final RecordWriter writer =
+      getRecordWriter(namedOutput, baseFileName, reporter);
+
+    return new AvroCollector() {
+   
+      @SuppressWarnings({"unchecked"})
+      public void collect(Object key) throws IOException{
+       AvroWrapper wrapper = new AvroWrapper(key);
+       writer.write(wrapper, NullWritable.get());
+      }
+      
+      public void collect(Object key,Object value) throws IOException
+      {
+        writer.write(key,value);
+      }  
+    
+    };
+  }
+
+  /**
+   * Closes all the opened named outputs.
+   * <p/>
+   * If overriden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   *
+   * @throws java.io.IOException thrown if any of the MultipleOutput files
+   *                             could not be closed properly.
+   */
+  public void close() throws IOException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(null);
+    }
+  }
+  
+  private static class InternalFileOutputFormat extends FileOutputFormat<Object, Object>
{
+   public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
+
+   @SuppressWarnings({"unchecked"})
+   public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String
baseFileName, Progressable arg3) throws IOException
+   {
+   String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
+   String fileName = getUniqueName(job, baseFileName);
+   Schema schema = schemaList.get(nameOutput+"_SCHEMA");
+   JobConf outputConf = new JobConf(job);
+   outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
+   if(schema!=null)
+    AvroJob.setOutputSchema(outputConf,schema);
+   OutputFormat outputFormat = outputConf.getOutputFormat();
+   return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3);
+   }   
+  }
+}
+

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java?rev=1341230&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
(added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
Mon May 21 22:14:15 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.Locale;
+
+import org.apache.hadoop.io.Text;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestAvroMultipleOutputs {
+
+      private static final String UTF8 = "UTF-8";
+
+  public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long>> {
+ 
+  
+
+    @Override
+      public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
+                      Reporter reporter) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(text.toString());
+      while (tokens.hasMoreTokens())
+        collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
+    }
+  }
+  
+  public static class ReduceImpl
+    extends AvroReducer<Utf8, Long, Pair<Utf8, Long> > {
+    private AvroMultipleOutputs amos;
+    
+    public void configure(JobConf Job)
+    {
+        amos=new AvroMultipleOutputs(Job);
+    }    
+
+    @Override
+    public void reduce(Utf8 word, Iterable<Long> counts,
+                       AvroCollector<Pair<Utf8,Long>> collector,
+                       Reporter reporter) throws IOException {
+      long sum = 0;
+      for (long count : counts)
+        sum += count;
+      Pair<Utf8,Long> outputvalue= new Pair<Utf8,Long>(word,sum);
+      amos.getCollector("myavro",reporter).collect(outputvalue);
+      amos.getCollector("myavro1",reporter).collect(outputvalue.toString());
+      collector.collect(new Pair<Utf8,Long>(word, sum));
+    }
+    public void close() throws IOException
+    {
+      amos.close();
+    }
+  }    
+
+  @Test public void runTestsInOrder() throws Exception {
+    testJob();
+    testProjection();
+    testProjection1();
+  }
+  
+  @SuppressWarnings("deprecation")
+  public void testJob() throws Exception {
+    JobConf job = new JobConf();
+    
+//    private static final String UTF8 = "UTF-8";
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    
+    outputPath.getFileSystem(job).delete(outputPath);
+    WordCountUtil.writeLinesFile();
+    
+    job.setJobName("AvroMultipleOutputs");
+    
+    AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputSchema(job,
+                            new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+    
+    AvroJob.setMapperClass(job, MapImpl.class);        
+    AvroJob.setReducerClass(job, ReduceImpl.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+    FileOutputFormat.setCompressOutput(job, false);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroOutputFormat.class, new Pair<Utf8,Long>(new
Utf8(""), 0L).getSchema());
+    AvroMultipleOutputs.addNamedOutput(job,"myavro1",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
+    
+    WordCountUtil.setMeta(job);
+
+
+    JobClient.runJob(job);
+    
+    WordCountUtil.validateCountsFile();
+  }
+  
+  @SuppressWarnings("deprecation")
+  public void testProjection() throws Exception {
+    JobConf job = new JobConf();
+    
+    Integer defaultRank = new Integer(-1);
+    
+    String jsonSchema = 
+      "{\"type\":\"record\"," +
+      "\"name\":\"org.apache.avro.mapred.Pair\","+
+      "\"fields\": [ " + 
+        "{\"name\":\"rank\", \"type\":\"int\", \"default\": -1}," +
+        "{\"name\":\"value\", \"type\":\"long\"}" + 
+      "]}";
+    
+    Schema readerSchema = Schema.parse(jsonSchema);
+    
+    AvroJob.setInputSchema(job, readerSchema);
+    
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path inputPath = new Path(dir + "/out" + "/myavro-r-00000.avro");
+    FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+    FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
+
+    
+    AvroRecordReader<Pair<Integer, Long>> recordReader = new AvroRecordReader<Pair<Integer,
Long>>(job, fileSplit);
+    
+    AvroWrapper<Pair<Integer, Long>> inputPair = new AvroWrapper<Pair<Integer,
Long>>(null);
+    NullWritable ignore = NullWritable.get();
+    
+    long sumOfCounts = 0;
+    long numOfCounts = 0;
+    while(recordReader.next(inputPair, ignore)) {
+      Assert.assertEquals((Integer)inputPair.datum().get(0), defaultRank);
+      sumOfCounts += (Long) inputPair.datum().get(1);
+      numOfCounts++;
+    }
+    
+    Assert.assertEquals(numOfCounts, WordCountUtil.COUNTS.size());
+    
+    long actualSumOfCounts = 0;
+    for(Long count : WordCountUtil.COUNTS.values()) {
+      actualSumOfCounts += count;
+    }
+    
+    Assert.assertEquals(sumOfCounts, actualSumOfCounts);
+  }
+
+  @SuppressWarnings("deprecation")
+  // Test for a differnt schema output
+  public void testProjection1() throws Exception {
+    JobConf job = new JobConf();
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    AvroJob.setInputSchema(job, readerSchema);
+
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path inputPath = new Path(dir + "/out" + "/myavro1-r-00000.avro");
+    FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+    FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
+    AvroWrapper<Utf8> inputPair = new AvroWrapper<Utf8>(null);
+    NullWritable ignore = NullWritable.get();
+    AvroRecordReader<Utf8> recordReader = new AvroRecordReader<Utf8>(job, fileSplit);
+    long sumOfCounts = 0;
+    long numOfCounts = 0;
+    while(recordReader.next(inputPair, ignore)) {
+        sumOfCounts += Long.parseLong(inputPair.datum().toString().split(":")[2].replace("}","").trim());
+        numOfCounts++;
+    }
+    Assert.assertEquals(numOfCounts, WordCountUtil.COUNTS.size());
+    long actualSumOfCounts = 0;
+    for(Long count : WordCountUtil.COUNTS.values()) {
+     actualSumOfCounts += count;
+    }
+    Assert.assertEquals(sumOfCounts, actualSumOfCounts);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message