avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1325903 [2/4] - in /avro/trunk: ./ lang/java/ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/ la...
Date Fri, 13 Apr 2012 19:03:14 GMT
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import java.util.Collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * The {@link org.apache.hadoop.io.serializer.Serialization} used by jobs configured with
+ * {@link org.apache.avro.mapreduce.AvroJob}.
+ *
+ * @param <T> The Java type of the Avro data to serialize.
+ */
+public class AvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
+  /** Conf key for the writer schema of the AvroKey datum being serialized/deserialized. */
+  private static final String CONF_KEY_WRITER_SCHEMA = "avro.serialization.key.writer.schema";
+
+  /** Conf key for the reader schema of the AvroKey datum being serialized/deserialized. */
+  private static final String CONF_KEY_READER_SCHEMA = "avro.serialization.key.reader.schema";
+
+  /** Conf key for the writer schema of the AvroValue datum being serialized/deserialized. */
+  private static final String CONF_VALUE_WRITER_SCHEMA = "avro.serialization.value.writer.schema";
+
+  /** Conf key for the reader schema of the AvroValue datum being serialized/deserialized. */
+  private static final String CONF_VALUE_READER_SCHEMA = "avro.serialization.value.reader.schema";
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean accept(Class<?> c) {
+    return AvroKey.class.isAssignableFrom(c) || AvroValue.class.isAssignableFrom(c);
+  }
+
+  /**
+   * Gets an object capable of deserializing the output from a Mapper.
+   *
+   * @param c The class to get a deserializer for.
+   * @return A deserializer for objects of class <code>c</code>.
+   */
+  @Override
+  public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
+    Configuration conf = getConf();
+    if (AvroKey.class.isAssignableFrom(c)) {
+      return new AvroKeyDeserializer<T>(getKeyWriterSchema(conf), getKeyReaderSchema(conf));
+    } else if (AvroValue.class.isAssignableFrom(c)) {
+      return new AvroValueDeserializer<T>(getValueWriterSchema(conf), getValueReaderSchema(conf));
+    } else {
+      throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
+    }
+  }
+
+  /**
+   * Gets an object capable of serializing output from a Mapper.
+   *
+   * @param c The class to get a serializer for.
+   * @return A serializer for objects of class <code>c</code>.
+   */
+  @Override
+  public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
+    Schema schema;
+    if (AvroKey.class.isAssignableFrom(c)) {
+      schema = getKeyWriterSchema(getConf());
+    } else if (AvroValue.class.isAssignableFrom(c)) {
+      schema = getValueWriterSchema(getConf());
+    } else {
+      throw new IllegalStateException("Only AvroKey and AvroValue are supported.");
+    }
+    return new AvroSerializer<T>(schema);
+  }
+
+  /**
+   * Adds the AvroSerialization scheme to the configuration, so SerializationFactory
+   * instances constructed from the given configuration will be aware of it.
+   *
+   * @param conf The configuration to add AvroSerialization to.
+   */
+  public static void addToConfiguration(Configuration conf) {
+    Collection<String> serializations = conf.getStringCollection("io.serializations");
+    if (!serializations.contains(AvroSerialization.class.getName())) {
+      serializations.add(AvroSerialization.class.getName());
+      conf.setStrings("io.serializations",
+          serializations.toArray(new String[serializations.size()]));
+    }
+  }
+
+  /**
+   * Sets the writer schema of the AvroKey datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @param schema The Avro key schema.
+   */
+  public static void setKeyWriterSchema(Configuration conf, Schema schema) {
+    if (null == schema) {
+      throw new IllegalArgumentException("Writer schema may not be null");
+    }
+    conf.set(CONF_KEY_WRITER_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the reader schema of the AvroKey datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @param schema The Avro key schema.
+   */
+  public static void setKeyReaderSchema(Configuration conf, Schema schema) {
+    conf.set(CONF_KEY_READER_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the writer schema of the AvroValue datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @param schema The Avro value schema.
+   */
+  public static void setValueWriterSchema(Configuration conf, Schema schema) {
+    if (null == schema) {
+      throw new IllegalArgumentException("Writer schema may not be null");
+    }
+    conf.set(CONF_VALUE_WRITER_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the reader schema of the AvroValue datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @param schema The Avro value schema.
+   */
+  public static void setValueReaderSchema(Configuration conf, Schema schema) {
+    conf.set(CONF_VALUE_READER_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Gets the writer schema of the AvroKey datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @return The Avro key writer schema, or null if none was set.
+   */
+  public static Schema getKeyWriterSchema(Configuration conf) {
+    String json = conf.get(CONF_KEY_WRITER_SCHEMA);
+    return null == json ? null : Schema.parse(json);
+  }
+
+  /**
+   * Gets the reader schema of the AvroKey datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @return The Avro key reader schema, or null if none was set.
+   */
+  public static Schema getKeyReaderSchema(Configuration conf) {
+    String json = conf.get(CONF_KEY_READER_SCHEMA);
+    return null == json ? null : Schema.parse(json);
+  }
+
+  /**
+   * Gets the writer schema of the AvroValue datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @return The Avro value writer schema, or null if none was set.
+   */
+  public static Schema getValueWriterSchema(Configuration conf) {
+    String json = conf.get(CONF_VALUE_WRITER_SCHEMA);
+    return null == json ? null : Schema.parse(json);
+  }
+
+  /**
+   * Gets the reader schema of the AvroValue datum that is being serialized/deserialized.
+   *
+   * @param conf The configuration.
+   * @return The Avro value reader schema, or null if none was set.
+   */
+  public static Schema getValueReaderSchema(Configuration conf) {
+    String json = conf.get(CONF_VALUE_READER_SCHEMA);
+    return null == json ? null : Schema.parse(json);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerialization.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * Serializes AvroWrapper objects within Hadoop.
+ *
+ * <p>Keys and values containing Avro types are more efficiently serialized outside of the
+ * WritableSerialization model, so they are wrapped in {@link
+ * org.apache.avro.mapred.AvroWrapper} objects and serialization is handled by this
+ * class.</p>
+ *
+ * <p>MapReduce jobs that use AvroWrapper objects as keys or values need to be configured
+ * with {@link org.apache.avro.io.AvroSerialization}.  Use {@link
+ * org.apache.avro.mapreduce.AvroJob} to help with Job configuration.</p>
+ *
+ * @param <T> The Java type of the Avro data.
+ */
+public class AvroSerializer<T> implements Serializer<AvroWrapper<T>> {
+  /**
+   * The block size for the Avro encoder.
+   *
+   * This number was copied from the AvroSerialization of org.apache.avro.mapred in Avro 1.5.1.
+   *
+   * TODO(gwu): Do some benchmarking with different numbers here to see if it is important.
+   */
+  private static final int AVRO_ENCODER_BLOCK_SIZE_BYTES = 512;
+
+  /** An factory for creating Avro datum encoders. */
+  private static EncoderFactory mEncoderFactory
+      = new EncoderFactory().configureBlockSize(AVRO_ENCODER_BLOCK_SIZE_BYTES);
+
+  /** The writer schema for the data to serialize. */
+  private final Schema mWriterSchema;
+
+  /** The Avro datum writer for serializing. */
+  private final DatumWriter<T> mAvroDatumWriter;
+
+  /** The Avro encoder for serializing. */
+  private BinaryEncoder mAvroEncoder;
+
+  /** The output stream for serializing. */
+  private OutputStream mOutputStream;
+
+  /**
+   * Constructor.
+   *
+   * @param writerSchema The writer schema for the Avro data being serialized.
+   */
+  public AvroSerializer(Schema writerSchema) {
+    if (null == writerSchema) {
+      throw new IllegalArgumentException("Writer schema may not be null");
+    }
+    mWriterSchema = writerSchema;
+    mAvroDatumWriter = new SpecificDatumWriter<T>(writerSchema);
+  }
+
+  /**
+   * Gets the writer schema being used for serialization.
+   *
+   * @return The writer schema.
+   */
+  public Schema getWriterSchema() {
+    return mWriterSchema;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void open(OutputStream outputStream) throws IOException {
+    mOutputStream = outputStream;
+    mAvroEncoder = mEncoderFactory.binaryEncoder(outputStream, mAvroEncoder);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void serialize(AvroWrapper<T> avroWrapper) throws IOException {
+    mAvroDatumWriter.write(avroWrapper.datum(), mAvroEncoder);
+    // This would be a lot faster if the Serializer interface had a flush() method and the
+    // Hadoop framework called it when needed.  For now, we'll have to flush on every record.
+    mAvroEncoder.flush();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close() throws IOException {
+    mOutputStream.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+
+/**
+ * Deserializes AvroValue objects within Hadoop.
+ *
+ * @param <D> The java type of the avro data to deserialize.
+ *
+ * @see org.apache.avro.io.AvroDeserializer
+ */
+public class AvroValueDeserializer<D> extends AvroDeserializer<AvroWrapper<D>, D> {
+  /**
+   * Constructor.
+   *
+   * @param writerSchema The Avro writer schema for the data to deserialize.
+   * @param readerSchema The Avro reader schema for the data to deserialize.
+   */
+  public AvroValueDeserializer(Schema writerSchema, Schema readerSchema) {
+    super(writerSchema, readerSchema);
+  }
+
+  /**
+   * Creates a new empty <code>AvroValue</code> instance.
+   *
+   * @return a new empty AvroValue.
+   */
+  @Override
+  protected AvroWrapper<D> createAvroWrapper() {
+    return new AvroValue<D>(null);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroValueDeserializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/util/AvroCharSequenceComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/util/AvroCharSequenceComparator.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/util/AvroCharSequenceComparator.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/util/AvroCharSequenceComparator.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.util;
+
+import java.util.Comparator;
+
+/**
+ * Compares Avro string data (data with schema <i>"string"</i>).
+ *
+ * <p>The only case where comparing Avro objects does not work using their natural order
+ * is when the schema is <i>"string"</i>.  The Avro string schema maps to the Java
+ * <code>CharSequence</code> interface, which does not define <code>equals</code>,
+ * <code>hashCode</code>, or <code>compareTo</code>.</p>
+ *
+ * <p>Using this comparator enables comparisons between <code>String</code> and
+ * <code>Utf8</code> objects that are both valid when working with Avro strings.</p>
+ *
+ * @param <T> The type of object to compare.
+ */
+public class AvroCharSequenceComparator<T> implements Comparator<T> {
+  /** A singleton instance. */
+  public static final AvroCharSequenceComparator<CharSequence> INSTANCE
+      = new AvroCharSequenceComparator<CharSequence>();
+
+  /** {@inheritDoc} */
+  @Override
+  public int compare(T o1, T o2) {
+    if (!(o1 instanceof CharSequence) || !(o2 instanceof CharSequence)) {
+      throw new RuntimeException(
+          "Attempted use of AvroCharSequenceComparator on non-CharSequence objects: "
+          + o1.getClass().getName() + " and " + o2.getClass().getName());
+    }
+    return compareCharSequence((CharSequence) o1, (CharSequence) o2);
+  }
+
+  /**
+   * Compares the CharSequences <code>o1</code> and <code>o2</code>.
+   *
+   * @param o1 The left charsequence.
+   * @param o2 The right charsequence.
+   * @return a negative integer, zero, or a positive integer if the first argument is
+   *     less than, equal to, or greater than the second, respectively.
+   */
+  private int compareCharSequence(CharSequence o1, CharSequence o2) {
+    for (int i = 0; i < Math.max(o1.length(), o2.length()); i++) {
+      int charComparison = compareCharacter(o1, o2, i);
+      if (0 != charComparison) {
+        return charComparison;
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Compares the characters of <code>o1</code> and <code>o2</code> at index <code>index</code>.
+   *
+   * @param o1 The left charsequence.
+   * @param o2 The right charsequence.
+   * @param index The zero-based index into the charsequences to compare.
+   * @return a negative integer, zero, or a positive integer if the first argument is
+   *     less than, equal to, or greater than the second, respectively.
+   */
+  private int compareCharacter(CharSequence o1, CharSequence o2, int index) {
+    if (index < o1.length() && index < o2.length()) {
+      return Character.valueOf(o1.charAt(index)).compareTo(Character.valueOf(o2.charAt(index)));
+    }
+    if (index >= o1.length() && index >= o2.length()) {
+      return 0;
+    }
+    return o1.length() - o2.length();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/util/AvroCharSequenceComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,192 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyComparator;
+import org.apache.avro.hadoop.io.AvroSerialization;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Utility methods for configuring jobs that work with Avro.
+ *
+ * <p>When using Avro data as MapReduce keys and values, data must be wrapped in a
+ * suitable AvroWrapper implementation.  MapReduce keys must be wrapped in an AvroKey
+ * object, and MapReduce values must be wrapped in an AvroValue object.</p>
+ *
+ * <p>Suppose you would like to write a line count mapper that reads from a text file. If
+ * instead of using a Text and IntWritable output value, you would like to use Avro data
+ * with a schema of <i>"string"</i> and <i>"int"</i>, respectively, you may parameterize
+ * your mapper with {@code AvroKey<CharSequence>} and {@code AvroValue<Integer>}
+ * types.  Then, use the <code>setMapOutputKeySchema()</code> and
+ * <code>setMapOutputValueSchema()</code> methods to set writer schemas for the records
+ * you will generate.</p>
+ */
+public final class AvroJob {
+  /** Disable the constructor for this utility class. */
+  private AvroJob() {}
+
+  /** Configuration key for the input key schema. */
+  private static final String CONF_INPUT_KEY_SCHEMA = "avro.schema.input.key";
+
+  /** Configuration key for the input value schema. */
+  private static final String CONF_INPUT_VALUE_SCHEMA = "avro.schema.input.value";
+
+  /** Configuration key for the output key schema. */
+  private static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
+
+  /** Configuration key for the output value schema. */
+  private static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
+
+  /**
+   * Sets the job input key schema.
+   *
+   * @param job The job to configure.
+   * @param schema The input key schema.
+   */
+  public static void setInputKeySchema(Job job, Schema schema) {
+    job.getConfiguration().set(CONF_INPUT_KEY_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the job input value schema.
+   *
+   * @param job The job to configure.
+   * @param schema The input value schema.
+   */
+  public static void setInputValueSchema(Job job, Schema schema) {
+    job.getConfiguration().set(CONF_INPUT_VALUE_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the map output key schema.
+   *
+   * @param job The job to configure.
+   * @param schema The map output key schema.
+   */
+  public static void setMapOutputKeySchema(Job job, Schema schema) {
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setGroupingComparatorClass(AvroKeyComparator.class);
+    job.setSortComparatorClass(AvroKeyComparator.class);
+    AvroSerialization.setKeyWriterSchema(job.getConfiguration(), schema);
+    AvroSerialization.setKeyReaderSchema(job.getConfiguration(), schema);
+    AvroSerialization.addToConfiguration(job.getConfiguration());
+  }
+
+  /**
+   * Sets the map output value schema.
+   *
+   * @param job The job to configure.
+   * @param schema The map output value schema.
+   */
+  public static void setMapOutputValueSchema(Job job, Schema schema) {
+    job.setMapOutputValueClass(AvroValue.class);
+    AvroSerialization.setValueWriterSchema(job.getConfiguration(), schema);
+    AvroSerialization.setValueReaderSchema(job.getConfiguration(), schema);
+    AvroSerialization.addToConfiguration(job.getConfiguration());
+  }
+
+  /**
+   * Sets the job output key schema.
+   *
+   * @param job The job to configure.
+   * @param schema The job output key schema.
+   */
+  public static void setOutputKeySchema(Job job, Schema schema) {
+    job.setOutputKeyClass(AvroKey.class);
+    job.getConfiguration().set(CONF_OUTPUT_KEY_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Sets the job output value schema.
+   *
+   * @param job The job to configure.
+   * @param schema The job output value schema.
+   */
+  public static void setOutputValueSchema(Job job, Schema schema) {
+    job.setOutputValueClass(AvroValue.class);
+    job.getConfiguration().set(CONF_OUTPUT_VALUE_SCHEMA, schema.toString());
+  }
+
+  /**
+   * Gets the job input key schema.
+   *
+   * @param conf The job configuration.
+   * @return The job input key schema, or null if not set.
+   */
+  public static Schema getInputKeySchema(Configuration conf) {
+    String schemaString = conf.get(CONF_INPUT_KEY_SCHEMA);
+    return schemaString != null ? Schema.parse(schemaString) : null;
+  }
+
+  /**
+   * Gets the job input value schema.
+   *
+   * @param conf The job configuration.
+   * @return The job input value schema, or null if not set.
+   */
+  public static Schema getInputValueSchema(Configuration conf) {
+    String schemaString = conf.get(CONF_INPUT_VALUE_SCHEMA);
+    return schemaString != null ? Schema.parse(schemaString) : null;
+  }
+
+  /**
+   * Gets the map output key schema.
+   *
+   * @param conf The job configuration.
+   * @return The map output key schema, or null if not set.
+   */
+  public static Schema getMapOutputKeySchema(Configuration conf) {
+    return AvroSerialization.getKeyWriterSchema(conf);
+  }
+
+  /**
+   * Gets the map output value schema.
+   *
+   * @param conf The job configuration.
+   * @return The map output value schema, or null if not set.
+   */
+  public static Schema getMapOutputValueSchema(Configuration conf) {
+    return AvroSerialization.getValueWriterSchema(conf);
+  }
+
+  /**
+   * Gets the job output key schema.
+   *
+   * @param conf The job configuration.
+   * @return The job output key schema, or null if not set.
+   */
+  public static Schema getOutputKeySchema(Configuration conf) {
+    String schemaString = conf.get(CONF_OUTPUT_KEY_SCHEMA);
+    return schemaString != null ? Schema.parse(schemaString) : null;
+  }
+
+  /**
+   * Gets the job output value schema.
+   *
+   * @param conf The job configuration.
+   * @return The job output value schema, or null if not set.
+   */
+  public static Schema getOutputValueSchema(Configuration conf) {
+    String schemaString = conf.get(CONF_OUTPUT_VALUE_SCHEMA);
+    return schemaString != null ? Schema.parse(schemaString) : null;
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyInputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyInputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A MapReduce InputFormat that can handle Avro container files.
+ *
+ * <p>Keys are AvroKey wrapper objects that contain the Avro data.  Since Avro
+ * container files store only records (not key/value pairs), the value from
+ * this InputFormat is a NullWritable.</p>
+ */
+public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroKeyInputFormat.class);
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
+    if (null == readerSchema) {
+      LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
+      LOG.info("Using a reader schema equal to the writer schema.");
+    }
+    return new AvroKeyRecordReader<T>(readerSchema);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files.
+ *
+ * <p>Since Avro container files only contain records (not key/value pairs), this output
+ * format ignores the value.</p>
+ *
+ * @param <T> The (java) type of the Avro data to write.
+ */
+public class AvroKeyOutputFormat<T> extends AvroOutputFormatBase<AvroKey<T>, NullWritable> {
+  /** A factory for creating record writers. */
+  private final RecordWriterFactory mRecordWriterFactory;
+
+  /**
+   * Constructor.
+   */
+  public AvroKeyOutputFormat() {
+    this(new RecordWriterFactory());
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param recordWriterFactory A factory for creating record writers.
+   */
+  protected AvroKeyOutputFormat(RecordWriterFactory recordWriterFactory) {
+    mRecordWriterFactory = recordWriterFactory;
+  }
+
+  /**
+   * A factory for creating record writers.
+   *
+   * @param <T> The java type of the avro record to write.
+   */
+  protected static class RecordWriterFactory<T> {
+    /**
+     * Creates a new record writer instance.
+     *
+     * @param writerSchema The writer schema for the records to write.
+     * @param compressionCodec The compression type for the writer file.
+     * @param outputStream The target output stream for the records.
+     */
+    protected RecordWriter<AvroKey<T>, NullWritable> create(
+        Schema writerSchema, CodecFactory compressionCodec, OutputStream outputStream)
+        throws IOException {
+      return new AvroKeyRecordWriter<T>(writerSchema, compressionCodec, outputStream);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    // Get the writer schema.
+    Schema writerSchema = AvroJob.getOutputKeySchema(context.getConfiguration());
+    if (null == writerSchema) {
+      throw new IOException(
+          "AvroKeyOutputFormat requires an output schema. Use AvroJob.setOutputKeySchema().");
+    }
+
+    return mRecordWriterFactory.create(
+        writerSchema, getCompressionCodec(context), getAvroFileOutputStream(context));
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordReader.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordReader.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordReader.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.io.NullWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads records from an input split representing a chunk of an Avro container file.
+ *
+ * @param <T> The (java) type of data in Avro container file.
+ */
+public class AvroKeyRecordReader<T> extends AvroRecordReaderBase<AvroKey<T>, NullWritable, T> {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroKeyRecordReader.class);
+
+  /** A reusable object to hold records of the Avro container file. */
+  private final AvroKey<T> mCurrentRecord;
+
+  /**
+   * Constructor.
+   *
+   * @param readerSchema The reader schema to use for the records in the Avro container file.
+   */
+  public AvroKeyRecordReader(Schema readerSchema) {
+    super(readerSchema);
+    mCurrentRecord = new AvroKey<T>(null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    boolean hasNext = super.nextKeyValue();
+    mCurrentRecord.datum(getCurrentRecord());
+    return hasNext;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public AvroKey<T> getCurrentKey() throws IOException, InterruptedException {
+    return mCurrentRecord;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public NullWritable getCurrentValue() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes Avro records to an Avro container file output stream.
+ *
+ * @param <T> The Java type of the Avro data to write.
+ */
+public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
+  /** A writer for the Avro container file. */
+  private final DataFileWriter<T> mAvroFileWriter;
+
+  /**
+   * Constructor.
+   *
+   * @param writerSchema The writer schema for the records in the Avro container file.
+   * @param compressionCodec A compression codec factory for the Avro container file.
+   * @param outputStream The output stream to write the Avro container file to.
+   * @throws IOException If the record writer cannot be opened.
+   */
+  public AvroKeyRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
+      OutputStream outputStream) throws IOException {
+    // Create an Avro container file and a writer to it.
+    mAvroFileWriter = new DataFileWriter<T>(new SpecificDatumWriter<T>(writerSchema));
+    mAvroFileWriter.setCodec(compressionCodec);
+    mAvroFileWriter.create(writerSchema, outputStream);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
+    mAvroFileWriter.append(record.datum());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    mAvroFileWriter.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueInputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueInputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A MapReduce InputFormat that reads from Avro container files of key/value generic records.
+ *
+ * <p>Avro container files that container generic records with the two fields 'key' and
+ * 'value' are expected.  The contents of the 'key' field will be used as the job input
+ * key, and the contents of the 'value' field will be used as the job output value.</p>
+ *
+ * @param <K> The type of the Avro key to read.
+ * @param <V> The type of the Avro value to read.
+ */
+public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroKeyValueInputFormat.class);
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    Schema keyReaderSchema = AvroJob.getInputKeySchema(context.getConfiguration());
+    if (null == keyReaderSchema) {
+      LOG.warn("Key reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
+      LOG.info("Using a key reader schema equal to the writer schema.");
+    }
+    Schema valueReaderSchema = AvroJob.getInputValueSchema(context.getConfiguration());
+    if (null == valueReaderSchema) {
+      LOG.warn("Value reader schema was not set. Use AvroJob.setInputValueSchema() if desired.");
+      LOG.info("Using a value reader schema equal to the writer schema.");
+    }
+    return new AvroKeyValueRecordReader<K, V>(keyReaderSchema, valueReaderSchema);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FileOutputFormat for writing Avro container files of key/value pairs.
+ *
+ * <p>Since Avro container files can only contain records (not key/value pairs), this
+ * output format puts the key and value into an Avro generic record with two fields, named
+ * 'key' and 'value'.</p>
+ *
+ * <p>The keys and values given to this output format may be Avro objects wrapped in
+ * <code>AvroKey</code> or <code>AvroValue</code> objects.  The basic Writable types are
+ * also supported (e.g., IntWritable, Text); they will be converted to their corresponding
+ * Avro types.</p>
+ *
+ * @param <K> The type of key. If an Avro type, it must be wrapped in an <code>AvroKey</code>.
+ * @param <V> The type of value. If an Avro type, it must be wrapped in an <code>AvroValue</code>.
+ */
+public class AvroKeyValueOutputFormat<K, V> extends AvroOutputFormatBase<K, V> {
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
+    AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(
+        context.getConfiguration());
+
+    AvroDatumConverter<K, ?> keyConverter = converterFactory.create(
+        (Class<K>) context.getOutputKeyClass());
+    AvroDatumConverter<V, ?> valueConverter = converterFactory.create(
+        (Class<V>) context.getOutputValueClass());
+
+    return new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter,
+        getCompressionCodec(context), getAvroFileOutputStream(context));
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordReader.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordReader.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordReader.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+
+/**
+ * Reads Avro generic records from an Avro container file, where the records contain two
+ * fields: 'key' and 'value'.
+ *
+ * <p>The contents of the 'key' field will be parsed into an AvroKey object. The contents
+ * of the 'value' field will be parsed into an AvroValue object.</p>
+ *
+ * @param <K> The type of the Avro key to read.
+ * @param <V> The type of the Avro value to read.
+ */
+public class AvroKeyValueRecordReader<K, V>
+    extends AvroRecordReaderBase<AvroKey<K>, AvroValue<V>, GenericRecord> {
+  /** The current key the reader is on. */
+  private final AvroKey<K> mCurrentKey;
+
+  /** The current value the reader is on. */
+  private final AvroValue<V> mCurrentValue;
+
+  /**
+   * Constructor.
+   *
+   * @param keyReaderSchema The reader schema for the key within the generic record.
+   * @param valueReaderSchema The reader schema for the value within the generic record.
+   */
+  public AvroKeyValueRecordReader(Schema keyReaderSchema, Schema valueReaderSchema) {
+    super(AvroKeyValue.getSchema(keyReaderSchema, valueReaderSchema));
+    mCurrentKey = new AvroKey<K>(null);
+    mCurrentValue = new AvroValue<V>(null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    boolean hasNext = super.nextKeyValue();
+    if (hasNext) {
+      AvroKeyValue<K, V> avroKeyValue = new AvroKeyValue<K, V>(getCurrentRecord());
+      mCurrentKey.datum(avroKeyValue.getKey());
+      mCurrentValue.datum(avroKeyValue.getValue());
+    } else {
+      mCurrentKey.datum(null);
+      mCurrentValue.datum(null);
+    }
+    return hasNext;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public AvroKey<K> getCurrentKey() throws IOException, InterruptedException {
+    return mCurrentKey;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public AvroValue<V> getCurrentValue() throws IOException, InterruptedException {
+    return mCurrentValue;
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Writes key/value pairs to an Avro container file.
+ *
+ * <p>Each entry in the Avro container file will be a generic record with two fields,
+ * named 'key' and 'value'.  The input types may be basic Writable objects like Text or
+ * IntWritable, or they may be AvroWrapper subclasses (AvroKey or AvroValue).  Writable
+ * objects will be converted to their corresponding Avro types when written to the generic
+ * record key/value pair.</p>
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public class AvroKeyValueRecordWriter<K, V> extends RecordWriter<K, V> {
+  /** A writer for the Avro container file. */
+  private final DataFileWriter<GenericRecord> mAvroFileWriter;
+
+  /** The writer schema for the generic record entries of the Avro container file. */
+  private final Schema mKeyValuePairSchema;
+
+  /** A reusable Avro generic record for writing key/value pairs to the file. */
+  private final AvroKeyValue<Object, Object> mOutputRecord;
+
+  /** A helper object that converts the input key to an Avro datum. */
+  private final AvroDatumConverter<K, ?> mKeyConverter;
+
+  /** A helper object that converts the input value to an Avro datum. */
+  private final AvroDatumConverter<V, ?> mValueConverter;
+
+  public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
+      AvroDatumConverter<V, ?> valueConverter, CodecFactory compressionCodec,
+      OutputStream outputStream) throws IOException {
+    // Create the generic record schema for the key/value pair.
+    mKeyValuePairSchema = AvroKeyValue.getSchema(
+        keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
+
+    // Create an Avro container file and a writer to it.
+    mAvroFileWriter = new DataFileWriter<GenericRecord>(
+        new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema));
+    mAvroFileWriter.setCodec(compressionCodec);
+    mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
+
+    // Keep a reference to the converters.
+    mKeyConverter = keyConverter;
+    mValueConverter = valueConverter;
+
+    // Create a reusable output record.
+    mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
+  }
+
+  /**
+   * Gets the writer schema for the key/value pair generic record.
+   *
+   * @return The writer schema used for entries of the Avro container file.
+   */
+  public Schema getWriterSchema() {
+    return mKeyValuePairSchema;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(K key, V value) throws IOException {
+    mOutputRecord.setKey(mKeyConverter.convert(key));
+    mOutputRecord.setValue(mValueConverter.convert(value));
+    mAvroFileWriter.append(mOutputRecord.get());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    mAvroFileWriter.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Abstract base class for output formats that write Avro container files.
+ *
+ * @param <K> The type of key to write.
+ * @param <V> The type of value to write.
+ */
+public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V> {
+  /**
+   * Gets the configured compression codec from the task context.
+   *
+   * @param context The task attempt context.
+   * @return The compression codec to use for the output Avro container file.
+   */
+  protected static CodecFactory getCompressionCodec(TaskAttemptContext context) {
+    if (FileOutputFormat.getCompressOutput(context)) {
+      // Deflate compression.
+      int compressionLevel = context.getConfiguration().getInt(
+          org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
+          org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+      return CodecFactory.deflateCodec(compressionLevel);
+    }
+
+    // No compression.
+    return CodecFactory.nullCodec();
+  }
+
+  /**
+   * Gets the target output stream where the Avro container file should be written.
+   *
+   * @param context The task attempt context.
+   * @return The target output stream.
+   */
+  protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
+    Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
+    return path.getFileSystem(context.getConfiguration()).create(path);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,181 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for <code>RecordReader</code>s that read Avro container files.
+ *
+ * @param <K> The type of key the record reader should generate.
+ * @param <V> The type of value the record reader should generate.
+ * @param <T> The type of the entries within the Avro container file being read.
+ */
+public abstract class AvroRecordReaderBase<K, V, T> extends RecordReader<K, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroRecordReaderBase.class);
+
+  /** The reader schema for the records within the input Avro container file. */
+  private final Schema mReaderSchema;
+
+  /** The current record from the Avro container file being read. */
+  private T mCurrentRecord;
+
+  /** A reader for the Avro container file containing the current input split. */
+  private DataFileReader<T> mAvroFileReader;
+
+  /**
+   * The byte offset into the Avro container file where the first block that fits
+   * completely within the current input split begins.
+   */
+  private long mStartPosition;
+
+  /** The byte offset into the Avro container file where the current input split ends. */
+  private long mEndPosition;
+
+  /**
+   * Constructor.
+   *
+   * @param readerSchema The reader schema for the records of the Avro container file.
+   */
+  protected AvroRecordReaderBase(Schema readerSchema) {
+    mReaderSchema = readerSchema;
+    mCurrentRecord = null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    if (!(inputSplit instanceof FileSplit)) {
+      throw new IllegalArgumentException("Only compatible with FileSplits.");
+    }
+    FileSplit fileSplit = (FileSplit) inputSplit;
+
+    // Open a seekable input stream to the Avro container file.
+    SeekableInput seekableFileInput
+        = createSeekableInput(context.getConfiguration(), fileSplit.getPath());
+
+    // Wrap the seekable input stream in an Avro DataFileReader.
+    mAvroFileReader = createAvroFileReader(seekableFileInput,
+        new SpecificDatumReader<T>(mReaderSchema));
+
+    // Initialize the start and end offsets into the file based on the boundaries of the
+    // input split we're responsible for.  We will read the first block that begins
+    // after the input split start boundary.  We will read up to but not including the
+    // first block that starts after input split end boundary.
+
+    // Sync to the closest block/record boundary just after beginning of our input split.
+    mAvroFileReader.sync(fileSplit.getStart());
+
+    // Initialize the start position to the beginning of the first block of the input split.
+    mStartPosition = mAvroFileReader.previousSync();
+
+    // Initialize the end position to the end of the input split (this isn't necessarily
+    // on a block boundary so using this for reporting progress will be approximate.
+    mEndPosition = fileSplit.getStart() + fileSplit.getLength();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    assert null != mAvroFileReader;
+
+    if (mAvroFileReader.hasNext() && !mAvroFileReader.pastSync(mEndPosition)) {
+      mCurrentRecord = mAvroFileReader.next(mCurrentRecord);
+      return true;
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    assert null != mAvroFileReader;
+
+    if (mEndPosition == mStartPosition) {
+      // Trivial empty input split.
+      return 0.0f;
+    }
+    long bytesRead = mAvroFileReader.previousSync() - mStartPosition;
+    long bytesTotal = mEndPosition - mStartPosition;
+    LOG.debug("Progress: bytesRead=" + bytesRead + ", bytesTotal=" + bytesTotal);
+    return Math.min(1.0f, (float) bytesRead / (float) bytesTotal);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close() throws IOException {
+    if (null != mAvroFileReader) {
+      try {
+        mAvroFileReader.close();
+      } finally {
+        mAvroFileReader = null;
+      }
+    }
+  }
+
+  /**
+   * Gets the current record read from the Avro container file.
+   *
+   * <p>Calling <code>nextKeyValue()</code> moves this to the next record.</p>
+   *
+   * @return The current Avro record (may be null if no record has been read).
+   */
+  protected T getCurrentRecord() {
+    return mCurrentRecord;
+  }
+
+  /**
+   * Creates a seekable input stream to an Avro container file.
+   *
+   * @param conf The hadoop configuration.
+   * @param path The path to the avro container file.
+   * @throws IOException If there is an error reading from the path.
+   */
+  protected SeekableInput createSeekableInput(Configuration conf, Path path)
+      throws IOException {
+    return new FsInput(path, conf);
+  }
+
+  /**
+   * Creates an Avro container file reader from a seekable input stream.
+   *
+   * @param input The input containing the Avro container file.
+   * @param datumReader The reader to use for the individual records in the Avro container file.
+   * @throws IOException If there is an error reading from the input stream.
+   */
+  protected DataFileReader<T> createAvroFileReader(
+      SeekableInput input, DatumReader<T> datumReader) throws IOException {
+    return new DataFileReader<T>(input, datumReader);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileInputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileInputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,144 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.avro.hadoop.io.AvroSequenceFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+/**
+ * An input format for reading from AvroSequenceFiles (sequence files that support Avro data).
+ *
+ * @param <K> The input key type.
+ * @param <V> The input value type.
+ */
+public class AvroSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {
+  /** {@inheritDoc} */
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException {
+    return new AvroSequenceFileRecordReader();
+  }
+
+  /**
+   * Reads records from a SequenceFile that supports Avro data.
+   *
+   * <p>This class is based on Hadoop's SequenceFileRecordReader, modified to construct an
+   * AvroSequenceFile.Reader instead of a SequenceFile.Reader.</p>
+   */
+  protected class AvroSequenceFileRecordReader extends RecordReader<K, V> {
+    private SequenceFile.Reader mReader;
+    private long mStart;
+    private long mEnd;
+    private boolean mHasMoreData;
+    private K mCurrentKey;
+    private V mCurrentValue;
+
+    /** {@inheritDoc} */
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      FileSplit fileSplit = (FileSplit) split;
+      Configuration conf = context.getConfiguration();
+      Path path = fileSplit.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+
+      // Configure the SequenceFile reader.
+      AvroSequenceFile.Reader.Options options = new AvroSequenceFile.Reader.Options()
+          .withFileSystem(fs)
+          .withInputPath(path)
+          .withConfiguration(conf);
+      Schema keySchema = AvroJob.getInputKeySchema(conf);
+      if (null != keySchema) {
+        options.withKeySchema(keySchema);
+      }
+      Schema valueSchema = AvroJob.getInputValueSchema(conf);
+      if (null != valueSchema) {
+        options.withValueSchema(valueSchema);
+      }
+
+      mReader = new AvroSequenceFile.Reader(options);
+      mEnd = fileSplit.getStart() + fileSplit.getLength();
+
+      if (fileSplit.getStart() > mReader.getPosition()) {
+        // Sync to the beginning of the input split.
+        mReader.sync(fileSplit.getStart());
+      }
+
+      mStart = mReader.getPosition();
+      mHasMoreData = mStart < mEnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (!mHasMoreData) {
+        return false;
+      }
+      long pos = mReader.getPosition();
+      mCurrentKey = (K) mReader.next(mCurrentKey);
+      if (null == mCurrentKey || (pos >= mEnd && mReader.syncSeen())) {
+        mHasMoreData = false;
+        mCurrentKey = null;
+        mCurrentValue = null;
+      } else {
+        mCurrentValue = (V) mReader.getCurrentValue(mCurrentValue);
+      }
+      return mHasMoreData;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public K getCurrentKey() {
+      return mCurrentKey;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public V getCurrentValue() {
+      return mCurrentValue;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float getProgress() throws IOException {
+      if (mEnd == mStart) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (mReader.getPosition() - mStart) / (float)(mEnd - mStart));
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void close() throws IOException {
+      mReader.close();
+    }
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileOutputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileOutputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,127 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A sequence file output format that knows how to write AvroKeys and AvroValues in
+ * addition to Writables.
+ *
+ * @param <K> The job output key type (may be a Writable, AvroKey).
+ * @param <V> The job output value type (may be a Writable, AvroValue).
+ */
+public class AvroSequenceFileOutputFormat<K, V> extends FileOutputFormat<K, V> {
+  /** Configuration key for storing the type of compression for the target sequence file. */
+  private static final String CONF_COMPRESSION_TYPE = "mapred.output.compression.type";
+
+  /** The default compression type for the target sequence file. */
+  private static final CompressionType DEFAULT_COMPRESSION_TYPE = CompressionType.RECORD;
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    // Configure compression if requested.
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(context)) {
+      // Find the kind of compression to do.
+      compressionType = getOutputCompressionType(conf);
+
+      // Find the right codec.
+      Class<?> codecClass = getOutputCompressorClass(context, DefaultCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+    }
+
+    // Get the path of the output file.
+    Path outputFile = getDefaultWorkFile(context, "");
+    FileSystem fs = outputFile.getFileSystem(conf);
+
+    // Configure the writer.
+    AvroSequenceFile.Writer.Options options = new AvroSequenceFile.Writer.Options()
+        .withFileSystem(fs)
+        .withConfiguration(conf)
+        .withOutputPath(outputFile)
+        .withKeyClass(context.getOutputKeyClass())
+        .withValueClass(context.getOutputValueClass())
+        .withProgressable(context)
+        .withCompressionType(compressionType)
+        .withCompressionCodec(codec);
+    Schema keySchema = AvroJob.getOutputKeySchema(conf);
+    if (null != keySchema) {
+      options.withKeySchema(keySchema);
+    }
+    Schema valueSchema = AvroJob.getOutputValueSchema(conf);
+    if (null != valueSchema) {
+      options.withValueSchema(valueSchema);
+    }
+    final SequenceFile.Writer out = AvroSequenceFile.createWriter(options);
+
+    return new RecordWriter<K, V>() {
+      @Override
+      public void write(K key, V value) throws IOException {
+        out.append(key, value);
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException {
+        out.close();
+      }
+    };
+  }
+
+  /**
+   * Sets the type of compression for the output sequence file.
+   *
+   * @param job The job configuration.
+   * @param compressionType The compression type for the target sequence file.
+   */
+  public static void setOutputCompressionType(Job job, CompressionType compressionType) {
+    setCompressOutput(job, true);
+    job.getConfiguration().set(CONF_COMPRESSION_TYPE, compressionType.name());
+  }
+
+  /**
+   * Gets type of compression for the output sequence file.
+   *
+   * @param conf The job configuration.
+   * @return The compression type.
+   */
+  public static CompressionType getOutputCompressionType(Configuration conf) {
+    String typeName = conf.get(CONF_COMPRESSION_TYPE, DEFAULT_COMPRESSION_TYPE.name());
+    return CompressionType.valueOf(typeName);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroSequenceFileOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+
+/** Base class to permit subclassing SequenceFile.  Provides access to two
+ * package-protected classes within SequenceFile. */
+public abstract class SequenceFileBase {
+  protected SequenceFileBase() {}
+
+  protected abstract static class BlockCompressWriterBase
+    extends SequenceFile.BlockCompressWriter {
+    protected BlockCompressWriterBase(FileSystem fs, Configuration conf,
+                                      Path name,
+                                      Class keyClass, Class valClass,
+                                      int bufferSize, short replication,
+                                      long blockSize, CompressionCodec codec,
+                                      Progressable progress, Metadata metadata)
+      throws IOException {
+      super(fs, conf, name, keyClass, valClass, bufferSize, replication,
+            blockSize, codec, progress, metadata);
+    }
+  }
+
+  protected abstract static class RecordCompressWriterBase
+    extends SequenceFile.RecordCompressWriter {
+    protected RecordCompressWriterBase(FileSystem fs, Configuration conf,
+                                       Path name,
+                                       Class keyClass, Class valClass,
+                                       int bufferSize, short replication,
+                                       long blockSize, CompressionCodec codec,
+                                       Progressable progress, Metadata metadata)
+      throws IOException {
+      super(fs, conf, name, keyClass, valClass, bufferSize, replication,
+            blockSize, codec, progress, metadata);
+    }
+  }
+
+}

Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/avro/TextStats.avsc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/avro/TextStats.avsc?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/avro/TextStats.avsc (added)
+++ avro/trunk/lang/java/mapred/src/test/avro/TextStats.avsc Fri Apr 13 19:03:12 2012
@@ -0,0 +1,9 @@
+{
+  "namespace": "org.apache.avro.mapreduce",
+  "type": "record",
+  "name": "TextStats",
+  "fields": [
+    { "name": "name", "type": "string", "default": "" },
+    { "name": "count", "type": "int", "default": 0 }
+  ]
+}



Mime
View raw message