parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [50/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:47 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
new file mode 100644
index 0000000..f76f367
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -0,0 +1,700 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+
+class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter {
+
+  private final ParentValueContainer parent;
+  protected T currentRecord;
+  private final Converter[] converters;
+
+  private final Schema avroSchema;
+  private final Class<? extends IndexedRecord> specificClass;
+
+  private final GenericData model;
+  private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();
+
+  public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
+    this(parquetSchema, avroSchema, SpecificData.get());
+  }
+
+  public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema,
+      GenericData baseModel) {
+    this(null, parquetSchema, avroSchema, baseModel);
+  }
+
+  public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
+      parquetSchema, Schema avroSchema) {
+    this(parent, parquetSchema, avroSchema, SpecificData.get());
+  }
+
+  public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
+      parquetSchema, Schema avroSchema, GenericData baseModel) {
+    this.parent = parent;
+    this.avroSchema = avroSchema;
+    int schemaSize = parquetSchema.getFieldCount();
+    this.converters = new Converter[schemaSize];
+    this.specificClass = baseModel instanceof SpecificData ?
+        ((SpecificData) baseModel).getClass(avroSchema) : null;
+
+    this.model = this.specificClass == null ? GenericData.get() : baseModel;
+
+    Map<String, Integer> avroFieldIndexes = new HashMap<String, Integer>();
+    int avroFieldIndex = 0;
+    for (Schema.Field field: avroSchema.getFields()) {
+        avroFieldIndexes.put(field.name(), avroFieldIndex++);
+    }
+    int parquetFieldIndex = 0;
+    for (Type parquetField: parquetSchema.getFields()) {
+      Schema.Field avroField = getAvroField(parquetField.getName());
+      Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
+      final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
+      converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() {
+        @Override
+        void add(Object value) {
+          AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
+        }
+      });
+    }
+    // store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema)
+    for (String fieldName : avroFieldIndexes.keySet()) {
+      Schema.Field field = avroSchema.getField(fieldName);
+      if (field.schema().getType() == Schema.Type.NULL) {
+        continue; // skip null since Parquet does not write nulls
+      }
+      if (field.defaultValue() == null || model.getDefaultValue(field) == null) {
+        continue; // field has no default
+      }
+      recordDefaults.put(field, model.getDefaultValue(field));
+    }
+  }
+
+  private Schema.Field getAvroField(String parquetFieldName) {
+    Schema.Field avroField = avroSchema.getField(parquetFieldName);
+    for (Schema.Field f : avroSchema.getFields()) {
+      if (f.aliases().contains(parquetFieldName)) {
+        return f;
+      }
+    }
+    if (avroField == null) {
+      throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.",
+          parquetFieldName));
+    }
+    return avroField;
+  }
+
+  private static Converter newConverter(Schema schema, Type type,
+      GenericData model, ParentValueContainer parent) {
+    if (schema.getType().equals(Schema.Type.BOOLEAN)) {
+      return new FieldBooleanConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.INT)) {
+      return new FieldIntegerConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.LONG)) {
+      return new FieldLongConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.FLOAT)) {
+      return new FieldFloatConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
+      return new FieldDoubleConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.BYTES)) {
+      return new FieldBytesConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.STRING)) {
+      return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
+    } else if (schema.getType().equals(Schema.Type.RECORD)) {
+      return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
+    } else if (schema.getType().equals(Schema.Type.ENUM)) {
+      return new FieldEnumConverter(parent, schema, model);
+    } else if (schema.getType().equals(Schema.Type.ARRAY)) {
+      return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
+    } else if (schema.getType().equals(Schema.Type.MAP)) {
+      return new MapConverter(parent, type.asGroupType(), schema, model);
+    } else if (schema.getType().equals(Schema.Type.UNION)) {
+      return new AvroUnionConverter(parent, type, schema, model);
+    } else if (schema.getType().equals(Schema.Type.FIXED)) {
+      return new FieldFixedConverter(parent, schema, model);
+    }
+    throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s" +
+        " (Parquet type: %s) ", schema, type));
+  }
+
+  private void set(int index, Object value) {
+    this.currentRecord.put(index, value);
+  }
+
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  @Override
+  public void start() {
+    // Should do the right thing whether it is generic or specific
+    this.currentRecord = (T) ((this.specificClass == null) ?
+            new GenericData.Record(avroSchema) :
+            ((SpecificData) model).newInstance(specificClass, avroSchema));
+  }
+
+  @Override
+  public void end() {
+    fillInDefaults();
+    if (parent != null) {
+      parent.add(currentRecord);
+    }
+  }
+
+  private void fillInDefaults() {
+    for (Map.Entry<Schema.Field, Object> entry : recordDefaults.entrySet()) {
+      Schema.Field f = entry.getKey();
+      // replace following with model.deepCopy once AVRO-1455 is being used
+      Object defaultValue = deepCopy(f.schema(), entry.getValue());
+      this.currentRecord.put(f.pos(), defaultValue);
+    }
+  }
+
+  private Object deepCopy(Schema schema, Object value) {
+    switch (schema.getType()) {
+      case BOOLEAN:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return value;
+      default:
+        return model.deepCopy(schema, value);
+    }
+  }
+
+  T getCurrentRecord() {
+    return currentRecord;
+  }
+
+  static abstract class ParentValueContainer {
+
+    /**
+     * Adds the value to the parent.
+     */
+    abstract void add(Object value);
+
+  }
+
+  static final class FieldBooleanConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldBooleanConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBoolean(boolean value) {
+      parent.add(value);
+    }
+
+  }
+
+  static final class FieldIntegerConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldIntegerConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(value);
+    }
+
+  }
+
+  static final class FieldLongConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldLongConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(Long.valueOf(value));
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(value);
+    }
+
+  }
+
+  static final class FieldFloatConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldFloatConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(Float.valueOf(value));
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(Float.valueOf(value));
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.add(value);
+    }
+
+  }
+
+  static final class FieldDoubleConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldDoubleConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(Double.valueOf(value));
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(Double.valueOf(value));
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.add(Double.valueOf(value));
+    }
+
+    @Override
+    final public void addDouble(double value) {
+      parent.add(value);
+    }
+
+  }
+
+  static final class FieldBytesConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+
+    public FieldBytesConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(ByteBuffer.wrap(value.getBytes()));
+    }
+
+  }
+
+  static final class FieldStringConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+    private final boolean dictionarySupport;
+    private String[] dict;
+
+    public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
+      this.parent = parent;
+      this.dictionarySupport = dictionarySupport;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value.toStringUsingUTF8());
+    }
+
+    @Override
+    public boolean hasDictionarySupport() {
+      return dictionarySupport;
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+      dict = new String[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
+      }
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      parent.add(dict[dictionaryId]);
+    }
+  }
+
+  static final class FieldEnumConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+    private final Class<? extends Enum> enumClass;
+
+    public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
+        GenericData model) {
+      this.parent = parent;
+      this.enumClass = model instanceof SpecificData ?
+          ((SpecificData) model).getClass(enumSchema) :
+          SpecificData.get().getClass(enumSchema);
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      Object enumValue = value.toStringUsingUTF8();
+      if (enumClass != null) {
+        enumValue = (Enum.valueOf(enumClass,(String)enumValue));
+      }
+      parent.add(enumValue);
+    }
+  }
+
+  static final class FieldFixedConverter extends PrimitiveConverter {
+
+    private final ParentValueContainer parent;
+    private final Schema avroSchema;
+    private final Class<? extends GenericData.Fixed> fixedClass;
+    private final Constructor fixedClassCtor;
+
+    public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
+        GenericData model) {
+      this.parent = parent;
+      this.avroSchema = avroSchema;
+      this.fixedClass = model instanceof SpecificData ?
+          ((SpecificData) model).getClass(avroSchema) :
+          SpecificData.get().getClass(avroSchema);
+      if (fixedClass != null) {
+        try {
+          this.fixedClassCtor = 
+              fixedClass.getConstructor(new Class[] { byte[].class });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        this.fixedClassCtor = null;
+      }
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      if (fixedClass == null) {
+        parent.add(new GenericData.Fixed(avroSchema, value.getBytes()));
+      } else {
+        if (fixedClassCtor == null) {
+          throw new IllegalArgumentException(
+              "fixedClass specified but fixedClassCtor is null.");
+        }
+        try {
+          Object fixed = fixedClassCtor.newInstance(value.getBytes());
+          parent.add(fixed);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Converter for a list.
+   *
+   * <pre>
+   *   optional group the_list (LIST) { <-- this layer
+   *     repeated group array {
+   *       optional (type) element;
+   *     }
+   *   }
+   * </pre>
+   *
+   * This class also implements LIST element backward-compatibility rules.
+   *
+   * @param <T> The type of elements in the list
+   */
+  static final class AvroArrayConverter<T> extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Schema avroSchema;
+    private final Converter converter;
+    private GenericArray<T> array;
+
+    public AvroArrayConverter(ParentValueContainer parent, GroupType type,
+        Schema avroSchema, GenericData model) {
+      this.parent = parent;
+      this.avroSchema = avroSchema;
+      Schema elementSchema = this.avroSchema.getElementType();
+      Type repeatedType = type.getType(0);
+      // always determine whether the repeated type is the element type by
+      // matching it against the element schema.
+      if (isElementType(repeatedType, elementSchema)) {
+        // the element type is the repeated type (and required)
+        converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          void add(Object value) {
+            array.add((T) value);
+          }
+        });
+      } else {
+        // the element is wrapped in a synthetic group and may be optional
+        converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
+      }
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return converter;
+    }
+
+    @Override
+    public void start() {
+      array = new GenericData.Array<T>(0, avroSchema);
+    }
+
+    @Override
+    public void end() {
+      parent.add(array);
+    }
+
+    /**
+     * Returns whether the given type is the element type of a list or is a
+     * synthetic group with one field that is the element type. This is
+     * determined by checking whether the type can be a synthetic group and by
+     * checking whether a potential synthetic group matches the expected schema.
+     * <p>
+     * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
+     * method never guesses because the expected schema is known.
+     *
+     * @param repeatedType a type that may be the element type
+     * @param elementSchema the expected Schema for list elements
+     * @return {@code true} if the repeatedType is the element schema
+     */
+    static boolean isElementType(Type repeatedType, Schema elementSchema) {
+      if (repeatedType.isPrimitive() ||
+          repeatedType.asGroupType().getFieldCount() > 1) {
+        // The repeated type must be the element type because it is an invalid
+        // synthetic wrapper (must be a group with one field).
+        return true;
+      } else if (elementSchema != null &&
+          elementSchema.getType() == Schema.Type.RECORD &&
+          elementSchema.getFields().size() == 1 &&
+          elementSchema.getFields().get(0).name().equals(
+              repeatedType.asGroupType().getFieldName(0))) {
+        // The repeated type must be the element type because it matches the
+        // structure of the Avro element's schema.
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Converter for list elements.
+     *
+     * <pre>
+     *   optional group the_list (LIST) {
+     *     repeated group array { <-- this layer
+     *       optional (type) element;
+     *     }
+     *   }
+     * </pre>
+     */
+    final class ElementConverter extends GroupConverter {
+      private T element;
+      private final Converter elementConverter;
+
+      public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
+        Type elementType = repeatedType.getType(0);
+        Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+        this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          void add(Object value) {
+            ElementConverter.this.element = (T) value;
+          }
+        });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        Preconditions.checkArgument(
+            fieldIndex == 0, "Illegal field index: " + fieldIndex);
+        return elementConverter;
+      }
+
+      @Override
+      public void start() {
+        element = null;
+      }
+
+      @Override
+      public void end() {
+        array.add(element);
+      }
+    }
+  }
+
+  static final class AvroUnionConverter<T> extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Converter[] memberConverters;
+    private Object memberValue = null;
+
+    public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
+                              Schema avroSchema, GenericData model) {
+      this.parent = parent;
+      GroupType parquetGroup = parquetSchema.asGroupType();
+      this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
+
+      int parquetIndex = 0;
+      for (int index = 0; index < avroSchema.getTypes().size(); index++) {
+        Schema memberSchema = avroSchema.getTypes().get(index);
+        if (!memberSchema.getType().equals(Schema.Type.NULL)) {
+          Type memberType = parquetGroup.getType(parquetIndex);
+          memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() {
+            @Override
+            void add(Object value) {
+              Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type");
+              memberValue = value;
+            }
+          });
+          parquetIndex++; // Note for nulls the parquetIndex id not increased
+        }
+      }
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return memberConverters[fieldIndex];
+    }
+
+    @Override
+    public void start() {
+      memberValue = null;
+    }
+
+    @Override
+    public void end() {
+      parent.add(memberValue);
+    }
+  }
+
+  static final class MapConverter<V> extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Converter keyValueConverter;
+    private Map<String, V> map;
+
+    public MapConverter(ParentValueContainer parent, GroupType mapType,
+        Schema mapSchema, GenericData model) {
+      this.parent = parent;
+      GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
+      this.keyValueConverter = new MapKeyValueConverter(
+          repeatedKeyValueType, mapSchema, model);
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return keyValueConverter;
+    }
+
+    @Override
+    public void start() {
+      this.map = new HashMap<String, V>();
+    }
+
+    @Override
+    public void end() {
+      parent.add(map);
+    }
+
+    final class MapKeyValueConverter extends GroupConverter {
+
+      private String key;
+      private V value;
+      private final Converter keyConverter;
+      private final Converter valueConverter;
+
+      public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
+          GenericData model) {
+        keyConverter = new PrimitiveConverter() {
+          @Override
+          final public void addBinary(Binary value) {
+            key = value.toStringUsingUTF8();
+          }
+        };
+
+        Type valueType = keyValueType.getType(1);
+        Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
+        valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          void add(Object value) {
+            MapKeyValueConverter.this.value = (V) value;
+          }
+        });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        if (fieldIndex == 0) {
+          return keyConverter;
+        } else if (fieldIndex == 1) {
+          return valueConverter;
+        }
+        throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex);
+      }
+
+      @Override
+      public void start() {
+        key = null;
+        value = null;
+      }
+
+      @Override
+      public void end() {
+        map.put(key, value);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
new file mode 100644
index 0000000..42bfb09
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
@@ -0,0 +1,84 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+/**
+ * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files.
+ */
+public class AvroParquetInputFormat<T> extends ParquetInputFormat<T> {
+  public AvroParquetInputFormat() {
+    super(AvroReadSupport.class);
+  }
+
+  /**
+   * Set the subset of columns to read (projection pushdown). Specified as an Avro
+   * schema, the requested projection is converted into a Parquet schema for Parquet
+   * column projection.
+   * <p>
+   * This is useful if the full schema is large and you only want to read a few
+   * columns, since it saves time by not reading unused columns.
+   * <p>
+   * If a requested projection is set, then the Avro schema used for reading
+   * must be compatible with the projection. For instance, if a column is not included
+   * in the projection then it must either not be included or be optional in the read
+   * schema. Use {@link #setAvroReadSchema(org.apache.hadoop.mapreduce.Job,
+   * org.apache.avro.Schema)} to set a read schema, if needed.
+   * @param job
+   * @param requestedProjection
+   * @see #setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setRequestedProjection(Job job, Schema requestedProjection) {
+    AvroReadSupport.setRequestedProjection(ContextUtil.getConfiguration(job),
+        requestedProjection);
+  }
+
+  /**
+   * Override the Avro schema to use for reading. If not set, the Avro schema used for
+   * writing is used.
+   * <p>
+   * Differences between the read and write schemas are resolved using
+   * <a href="http://avro.apache.org/docs/current/spec.html#Schema+Resolution">Avro's schema resolution rules</a>.
+   * @param job
+   * @param avroReadSchema
+   * @see #setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setAvroReadSchema(Job job, Schema avroReadSchema) {
+    AvroReadSupport.setAvroReadSchema(ContextUtil.getConfiguration(job), avroReadSchema);
+  }
+
+  /**
+   * Uses an instance of the specified {@link AvroDataSupplier} class to control how the
+   * {@link org.apache.avro.specific.SpecificData} instance that is used to find
+   * Avro specific records is created.
+   * @param job
+   * @param supplierClass
+   */
+  public static void setAvroDataSupplier(Job job,
+      Class<? extends AvroDataSupplier> supplierClass) {
+    AvroReadSupport.setAvroDataSupplier(ContextUtil.getConfiguration(job), supplierClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
new file mode 100644
index 0000000..afca74f
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
@@ -0,0 +1,50 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+/**
+ * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files.
+ */
+public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> {
+
+  /**
+   * Set the Avro schema to use for writing. The schema is translated into a Parquet
+   * schema so that the records can be written in Parquet format. It is also
+   * stored in the Parquet metadata so that records can be reconstructed as Avro
+   * objects at read time without specifying a read schema.
+   * @param job
+   * @param schema
+   * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setSchema(Job job, Schema schema) {
+    AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema);
+  }
+
+  public AvroParquetOutputFormat() {
+    super(new AvroWriteSupport());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
new file mode 100644
index 0000000..40cf5eb
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -0,0 +1,70 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Read Avro records from a Parquet file.
+ */
+public class AvroParquetReader<T extends IndexedRecord> extends ParquetReader<T> {
+
+  public static <T extends IndexedRecord> Builder<T> builder(Path file) {
+    return ParquetReader.builder(new AvroReadSupport<T>(), file);
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Path file) throws IOException {
+    super(file, new AvroReadSupport<T>());
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    super(file, new AvroReadSupport<T>(), unboundRecordFilter);
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Configuration conf, Path file) throws IOException {
+    super(conf, file, new AvroReadSupport<T>());
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Configuration conf, Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    super(conf, file, new AvroReadSupport<T>(), unboundRecordFilter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
new file mode 100644
index 0000000..afa2c6d
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -0,0 +1,106 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * Write Avro records to a Parquet file.
+ */
+public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T> {
+
+  /** Create a new {@link AvroParquetWriter}.
+   *
+   * @param file
+   * @param avroSchema
+   * @param compressionCodecName
+   * @param blockSize
+   * @param pageSize
+   * @throws IOException
+   */
+  public AvroParquetWriter(Path file, Schema avroSchema,
+      CompressionCodecName compressionCodecName, int blockSize,
+      int pageSize) throws IOException {
+    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+	      compressionCodecName, blockSize, pageSize);
+  }
+
+  /** Create a new {@link AvroParquetWriter}.
+   *
+   * @param file The file name to write to.
+   * @param avroSchema The schema to write with.
+   * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+   * @param blockSize the block size threshold.
+   * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+   * @param enableDictionary Whether to use a dictionary to compress columns.
+   * @throws IOException
+   */
+  public AvroParquetWriter(Path file, Schema avroSchema,
+                           CompressionCodecName compressionCodecName, int blockSize,
+                           int pageSize, boolean enableDictionary) throws IOException {
+    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+        compressionCodecName, blockSize, pageSize, enableDictionary,
+        DEFAULT_IS_VALIDATING_ENABLED);
+  }
+
+  /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default
+   *  page size is 1 MB.  Default compression is no compression. (Inherited from {@link ParquetWriter})
+   *
+   * @param file The file name to write to.
+   * @param avroSchema The schema to write with.
+   * @throws IOException
+   */
+  public AvroParquetWriter(Path file, Schema avroSchema) throws IOException {
+    this(file, avroSchema, CompressionCodecName.UNCOMPRESSED,
+	  DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  /** Create a new {@link AvroParquetWriter}.
+   *
+   * @param file The file name to write to.
+   * @param avroSchema The schema to write with.
+   * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+   * @param blockSize the block size threshold.
+   * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+   * @param enableDictionary Whether to use a dictionary to compress columns.
+   * @param conf The Configuration to use.
+   * @throws IOException
+   */
+  public AvroParquetWriter(Path file, Schema avroSchema,
+                           CompressionCodecName compressionCodecName,
+                           int blockSize, int pageSize, boolean enableDictionary,
+                           Configuration conf) throws IOException {
+    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+        compressionCodecName, blockSize, pageSize, pageSize, enableDictionary,
+        DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION, conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> WriteSupport<T> writeSupport(Schema avroSchema) {
+    return (WriteSupport<T>) new AvroWriteSupport(
+        new AvroSchemaConverter().convert(avroSchema), avroSchema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
new file mode 100644
index 0000000..9f1ba46
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -0,0 +1,113 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and
+ * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using
+ * this class directly.
+ */
+public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
+
+  public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
+  private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
+
+  static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
+  // older files were written with the schema in this metadata key
+  static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";
+  private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
+
+  public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
+
+  /**
+   * @see org.apache.parquet.avro.AvroParquetInputFormat#setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setRequestedProjection(Configuration configuration, Schema requestedProjection) {
+    configuration.set(AVRO_REQUESTED_PROJECTION, requestedProjection.toString());
+  }
+
+  /**
+   * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setAvroReadSchema(Configuration configuration, Schema avroReadSchema) {
+    configuration.set(AVRO_READ_SCHEMA, avroReadSchema.toString());
+  }
+
+  public static void setAvroDataSupplier(Configuration configuration,
+      Class<? extends AvroDataSupplier> clazz) {
+    configuration.set(AVRO_DATA_SUPPLIER, clazz.getName());
+  }
+
+  @Override
+  public ReadContext init(Configuration configuration,
+                          Map<String, String> keyValueMetaData,
+                          MessageType fileSchema) {
+    MessageType projection = fileSchema;
+    Map<String, String> metadata = null;
+
+    String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION);
+    if (requestedProjectionString != null) {
+      Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString);
+      projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection);
+    }
+    String avroReadSchema = configuration.get(AVRO_READ_SCHEMA);
+    if (avroReadSchema != null) {
+      metadata = new LinkedHashMap<String, String>();
+      metadata.put(AVRO_READ_SCHEMA_METADATA_KEY, avroReadSchema);
+    }
+    return new ReadContext(projection, metadata);
+  }
+
+  @Override
+  public RecordMaterializer<T> prepareForRead(
+      Configuration configuration, Map<String, String> keyValueMetaData,
+      MessageType fileSchema, ReadContext readContext) {
+    MessageType parquetSchema = readContext.getRequestedSchema();
+    Schema avroSchema;
+    if (readContext.getReadSupportMetadata() != null &&
+        readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+      // use the Avro read schema provided by the user
+      avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+    } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
+      // use the Avro schema from the file metadata if present
+      avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
+    } else if (keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY) != null) {
+      // use the Avro schema from the file metadata if present
+      avroSchema = new Schema.Parser().parse(keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY));
+    } else {
+      // default to converting the Parquet schema into an Avro schema
+      avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
+    }
+    Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
+        SpecificDataSupplier.class,
+        AvroDataSupplier.class);
+    AvroDataSupplier supplier =ReflectionUtils.newInstance(suppClass, configuration);
+    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, supplier.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
new file mode 100644
index 0000000..1794929
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
@@ -0,0 +1,46 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+class AvroRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer<T> {
+
+  private AvroIndexedRecordConverter<T> root;
+
+  public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
+      GenericData baseModel) {
+    this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
+  }
+
+  @Override
+  public T getCurrentRecord() {
+    return root.getCurrentRecord();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
new file mode 100644
index 0000000..19aa2b3
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -0,0 +1,355 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.util.*;
+
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.codehaus.jackson.node.NullNode;
+import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+
+/**
+ * <p>
+ * Converts an Avro schema into a Parquet schema. See package documentation for details
+ * of the mapping.
+ * </p>
+ */
+public class AvroSchemaConverter {
+
+  static final String ADD_LIST_ELEMENT_RECORDS =
+      "parquet.avro.add-list-element-records";
+  private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+
+  private final boolean assumeRepeatedIsListElement;
+
+  public AvroSchemaConverter() {
+    this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
+  }
+
+  public AvroSchemaConverter(Configuration conf) {
+    this.assumeRepeatedIsListElement = conf.getBoolean(
+        ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
+  }
+
+  /**
+   * Given a schema, check to see if it is a union of a null type and a regular schema,
+   * and then return the non-null sub-schema. Otherwise, return the given schema.
+   * 
+   * @param schema The schema to check
+   * @return The non-null portion of a union schema, or the given schema
+   */
+  public static Schema getNonNull(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      List<Schema> schemas = schema.getTypes();
+      if (schemas.size() == 2) {
+        if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+          return schemas.get(1);
+        } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+          return schemas.get(0);
+        } else {
+          return schema;
+        }
+      } else {
+        return schema;
+      }
+    } else {
+      return schema;
+    }
+  }
+
+  public MessageType convert(Schema avroSchema) {
+    if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
+      throw new IllegalArgumentException("Avro schema must be a record.");
+    }
+    return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
+  }
+
+  private List<Type> convertFields(List<Schema.Field> fields) {
+    List<Type> types = new ArrayList<Type>();
+    for (Schema.Field field : fields) {
+      if (field.schema().getType().equals(Schema.Type.NULL)) {
+        continue; // Avro nulls are not encoded, unless they are null unions
+      }
+      types.add(convertField(field));
+    }
+    return types;
+  }
+
+  private Type convertField(String fieldName, Schema schema) {
+    return convertField(fieldName, schema, Type.Repetition.REQUIRED);
+  }
+
+  private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+    Schema.Type type = schema.getType();
+    if (type.equals(Schema.Type.BOOLEAN)) {
+      return primitive(fieldName, BOOLEAN, repetition);
+    } else if (type.equals(Schema.Type.INT)) {
+      return primitive(fieldName, INT32, repetition);
+    } else if (type.equals(Schema.Type.LONG)) {
+      return primitive(fieldName, INT64, repetition);
+    } else if (type.equals(Schema.Type.FLOAT)) {
+      return primitive(fieldName, FLOAT, repetition);
+    } else if (type.equals(Schema.Type.DOUBLE)) {
+      return primitive(fieldName, DOUBLE, repetition);
+    } else if (type.equals(Schema.Type.BYTES)) {
+      return primitive(fieldName, BINARY, repetition);
+    } else if (type.equals(Schema.Type.STRING)) {
+      return primitive(fieldName, BINARY, repetition, UTF8);
+    } else if (type.equals(Schema.Type.RECORD)) {
+      return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
+    } else if (type.equals(Schema.Type.ENUM)) {
+      return primitive(fieldName, BINARY, repetition, ENUM);
+    } else if (type.equals(Schema.Type.ARRAY)) {
+      return ConversionPatterns.listType(repetition, fieldName,
+          convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+    } else if (type.equals(Schema.Type.MAP)) {
+      Type valType = convertField("value", schema.getValueType());
+      // avro map key type is always string
+      return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
+    } else if (type.equals(Schema.Type.FIXED)) {
+      return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
+                       schema.getFixedSize(), null);
+    } else if (type.equals(Schema.Type.UNION)) {
+      return convertUnion(fieldName, schema, repetition);
+    }
+    throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+  }
+
+  private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+    List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+    for (Schema childSchema : schema.getTypes()) {
+      if (childSchema.getType().equals(Schema.Type.NULL)) {
+        if (Type.Repetition.REQUIRED == repetition) {
+          repetition = Type.Repetition.OPTIONAL;
+        }
+      } else {
+        nonNullSchemas.add(childSchema);
+      }
+    }
+    // If we only get a null and one other type then its a simple optional field
+    // otherwise construct a union container
+    switch (nonNullSchemas.size()) {
+      case 0:
+        throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
+
+      case 1:
+        return convertField(fieldName, nonNullSchemas.get(0), repetition);
+
+      default: // complex union type
+        List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+        int index = 0;
+        for (Schema childSchema : nonNullSchemas) {
+          unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+        }
+        return new GroupType(repetition, fieldName, unionTypes);
+    }
+  }
+
+  private Type convertField(Schema.Field field) {
+    return convertField(field.name(), field.schema());
+  }
+
+  private PrimitiveType primitive(String name, 
+      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
+      int typeLength, OriginalType originalType) {
+    return new PrimitiveType(repetition, primitive, typeLength, name,
+                             originalType);
+  }
+
+  private PrimitiveType primitive(String name,
+      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, 
+      OriginalType originalType) {
+    return new PrimitiveType(repetition, primitive, name, originalType);
+  }
+
+  private PrimitiveType primitive(String name, 
+      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
+    return new PrimitiveType(repetition, primitive, name, null);
+  }
+
+  public Schema convert(MessageType parquetSchema) {
+    return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+  }
+
+  private Schema convertFields(String name, List<Type> parquetFields) {
+    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    for (Type parquetType : parquetFields) {
+      Schema fieldSchema = convertField(parquetType);
+      if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
+        throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
+      } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
+        fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
+            NullNode.getInstance()));
+      } else { // REQUIRED
+        fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+      }
+    }
+    Schema schema = Schema.createRecord(name, null, null, false);
+    schema.setFields(fields);
+    return schema;
+  }
+
+  private Schema convertField(final Type parquetType) {
+    if (parquetType.isPrimitive()) {
+      final PrimitiveTypeName parquetPrimitiveTypeName =
+          parquetType.asPrimitiveType().getPrimitiveTypeName();
+      final OriginalType originalType = parquetType.getOriginalType();
+      return parquetPrimitiveTypeName.convert(
+          new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
+            @Override
+            public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
+              return Schema.create(Schema.Type.BOOLEAN);
+            }
+            @Override
+            public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
+              return Schema.create(Schema.Type.INT);
+            }
+            @Override
+            public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
+              return Schema.create(Schema.Type.LONG);
+            }
+            @Override
+            public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
+              throw new IllegalArgumentException("INT64 not yet implemented.");
+            }
+            @Override
+            public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
+              return Schema.create(Schema.Type.FLOAT);
+            }
+            @Override
+            public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
+              return Schema.create(Schema.Type.DOUBLE);
+            }
+            @Override
+            public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
+              int size = parquetType.asPrimitiveType().getTypeLength();
+              return Schema.createFixed(parquetType.getName(), null, null, size);
+            }
+            @Override
+            public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
+              if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+                return Schema.create(Schema.Type.STRING);
+              } else {
+                return Schema.create(Schema.Type.BYTES);
+              }
+            }
+          });
+    } else {
+      GroupType parquetGroupType = parquetType.asGroupType();
+      OriginalType originalType = parquetGroupType.getOriginalType();
+      if (originalType != null) {
+        switch(originalType) {
+          case LIST:
+            if (parquetGroupType.getFieldCount()!= 1) {
+              throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+            }
+            Type repeatedType = parquetGroupType.getType(0);
+            if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
+              throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+            }
+            if (isElementType(repeatedType, parquetGroupType.getName())) {
+              // repeated element types are always required
+              return Schema.createArray(convertField(repeatedType));
+            } else {
+              Type elementType = repeatedType.asGroupType().getType(0);
+              if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+                return Schema.createArray(optional(convertField(elementType)));
+              } else {
+                return Schema.createArray(convertField(elementType));
+              }
+            }
+          case MAP_KEY_VALUE: // for backward-compatibility
+          case MAP:
+            if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
+              throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+            }
+            GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+            if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) ||
+                !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) ||
+                mapKeyValType.getFieldCount()!=2) {
+              throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+            }
+            Type keyType = mapKeyValType.getType(0);
+            if (!keyType.isPrimitive() ||
+                !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) ||
+                !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+              throw new IllegalArgumentException("Map key type must be binary (UTF8): "
+                  + keyType);
+            }
+            Type valueType = mapKeyValType.getType(1);
+            if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+              return Schema.createMap(optional(convertField(valueType)));
+            } else {
+              return Schema.createMap(convertField(valueType));
+            }
+          case ENUM:
+            return Schema.create(Schema.Type.STRING);
+          case UTF8:
+          default:
+            throw new UnsupportedOperationException("Cannot convert Parquet type " +
+                parquetType);
+
+        }
+      } else {
+        // if no original type then it's a record
+        return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
+      }
+    }
+  }
+
+  /**
+   * Implements the rules for interpreting existing data from the logical type
+   * spec for the LIST annotation. This is used to produce the expected schema.
+   * <p>
+   * The AvroArrayConverter will decide whether the repeated type is the array
+   * element type by testing whether the element schema and repeated type are
+   * the same. This ensures that the LIST rules are followed when there is no
+   * schema and that a schema can be provided to override the default behavior.
+   */
+  private boolean isElementType(Type repeatedType, String parentName) {
+    return (
+        // can't be a synthetic layer because it would be invalid
+        repeatedType.isPrimitive() ||
+        repeatedType.asGroupType().getFieldCount() > 1 ||
+        // known patterns without the synthetic layer
+        repeatedType.getName().equals("array") ||
+        repeatedType.getName().equals(parentName + "_tuple") ||
+        // default assumption
+        assumeRepeatedIsListElement
+    );
+  }
+
+  private static Schema optional(Schema original) {
+    // null is first in the union because Parquet's default is always null
+    return Schema.createUnion(Arrays.asList(
+        Schema.create(Schema.Type.NULL),
+        original));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
new file mode 100644
index 0000000..2ec8ee1
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -0,0 +1,234 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific.
+ * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using
+ * this class directly.
+ */
+public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
+
+  static final String AVRO_SCHEMA = "parquet.avro.schema";
+  private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
+
+  private RecordConsumer recordConsumer;
+  private MessageType rootSchema;
+  private Schema rootAvroSchema;
+
+  public AvroWriteSupport() {
+  }
+
+  public AvroWriteSupport(MessageType schema, Schema avroSchema) {
+    this.rootSchema = schema;
+    this.rootAvroSchema = avroSchema;
+  }
+
+  /**
+   * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  public static void setSchema(Configuration configuration, Schema schema) {
+    configuration.set(AVRO_SCHEMA, schema.toString());
+  }
+
+  @Override
+  public WriteContext init(Configuration configuration) {
+    if (rootAvroSchema == null) {
+      rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
+      rootSchema = new AvroSchemaConverter().convert(rootAvroSchema);
+    }
+    Map<String, String> extraMetaData = new HashMap<String, String>();
+    extraMetaData.put(AvroReadSupport.AVRO_SCHEMA_METADATA_KEY, rootAvroSchema.toString());
+    return new WriteContext(rootSchema, extraMetaData);
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    this.recordConsumer = recordConsumer;
+  }
+
+  @Override
+  public void write(IndexedRecord record) {
+    recordConsumer.startMessage();
+    writeRecordFields(rootSchema, rootAvroSchema, record);
+    recordConsumer.endMessage();
+  }
+
+  private void writeRecord(GroupType schema, Schema avroSchema,
+                           IndexedRecord record) {
+    recordConsumer.startGroup();
+    writeRecordFields(schema, avroSchema, record);
+    recordConsumer.endGroup();
+  }
+
+  private void writeRecordFields(GroupType schema, Schema avroSchema,
+                                 IndexedRecord record) {
+    List<Type> fields = schema.getFields();
+    List<Schema.Field> avroFields = avroSchema.getFields();
+    int index = 0; // parquet ignores Avro nulls, so index may differ
+    for (int avroIndex = 0; avroIndex < avroFields.size(); avroIndex++) {
+      Schema.Field avroField = avroFields.get(avroIndex);
+      if (avroField.schema().getType().equals(Schema.Type.NULL)) {
+        continue;
+      }
+      Type fieldType = fields.get(index);
+      Object value = record.get(avroIndex);
+      if (value != null) {
+        recordConsumer.startField(fieldType.getName(), index);
+        writeValue(fieldType, avroField.schema(), value);
+        recordConsumer.endField(fieldType.getName(), index);
+      } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+        throw new RuntimeException("Null-value for required field: " + avroField.name());
+      }
+      index++;
+    }
+  }
+
+  private <T> void writeArray(GroupType schema, Schema avroSchema,
+                              Collection<T> array) {
+    recordConsumer.startGroup(); // group wrapper (original type LIST)
+    if (array.size() > 0) {
+      recordConsumer.startField("array", 0);
+      for (T elt : array) {
+        writeValue(schema.getType(0), avroSchema.getElementType(), elt);
+      }
+      recordConsumer.endField("array", 0);
+    }
+    recordConsumer.endGroup();
+  }
+
+  private <V> void writeMap(GroupType schema, Schema avroSchema,
+                            Map<CharSequence, V> map) {
+    GroupType innerGroup = schema.getType(0).asGroupType();
+    Type keyType = innerGroup.getType(0);
+    Type valueType = innerGroup.getType(1);
+
+    recordConsumer.startGroup(); // group wrapper (original type MAP)
+    if (map.size() > 0) {
+      recordConsumer.startField("map", 0);
+
+      for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
+        recordConsumer.startGroup(); // repeated group key_value, middle layer
+        recordConsumer.startField("key", 0);
+        writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
+        recordConsumer.endField("key", 0);
+        V value = entry.getValue();
+        if (value != null) {
+          recordConsumer.startField("value", 1);
+          writeValue(valueType, avroSchema.getValueType(), value);
+          recordConsumer.endField("value", 1);
+        } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+          throw new RuntimeException("Null map value for " + avroSchema.getName());
+        }
+        recordConsumer.endGroup();
+      }
+
+      recordConsumer.endField("map", 0);
+    }
+    recordConsumer.endGroup();
+  }
+
+  private void writeUnion(GroupType parquetSchema, Schema avroSchema, 
+                          Object value) {
+    recordConsumer.startGroup();
+
+    // ResolveUnion will tell us which of the union member types to
+    // deserialise.
+    int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
+
+    // For parquet's schema we skip nulls
+    GroupType parquetGroup = parquetSchema.asGroupType();
+    int parquetIndex = avroIndex;
+    for (int i = 0; i < avroIndex; i++) {
+      if (avroSchema.getTypes().get(i).getType().equals(Schema.Type.NULL)) {
+        parquetIndex--;
+      }
+    }
+
+    // Sparsely populated method of encoding unions, each member has its own
+    // set of columns.
+    String memberName = "member" + parquetIndex;
+    recordConsumer.startField(memberName, parquetIndex);
+    writeValue(parquetGroup.getType(parquetIndex),
+               avroSchema.getTypes().get(avroIndex), value);
+    recordConsumer.endField(memberName, parquetIndex);
+
+    recordConsumer.endGroup();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void writeValue(Type type, Schema avroSchema, Object value) {
+    Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
+    Schema.Type avroType = nonNullAvroSchema.getType();
+    if (avroType.equals(Schema.Type.BOOLEAN)) {
+      recordConsumer.addBoolean((Boolean) value);
+    } else if (avroType.equals(Schema.Type.INT)) {
+      recordConsumer.addInteger(((Number) value).intValue());
+    } else if (avroType.equals(Schema.Type.LONG)) {
+      recordConsumer.addLong(((Number) value).longValue());
+    } else if (avroType.equals(Schema.Type.FLOAT)) {
+      recordConsumer.addFloat(((Number) value).floatValue());
+    } else if (avroType.equals(Schema.Type.DOUBLE)) {
+      recordConsumer.addDouble(((Number) value).doubleValue());
+    } else if (avroType.equals(Schema.Type.BYTES)) {
+      recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+    } else if (avroType.equals(Schema.Type.STRING)) {
+      recordConsumer.addBinary(fromAvroString(value));
+    } else if (avroType.equals(Schema.Type.RECORD)) {
+      writeRecord((GroupType) type, nonNullAvroSchema, (IndexedRecord) value);
+    } else if (avroType.equals(Schema.Type.ENUM)) {
+      recordConsumer.addBinary(Binary.fromString(value.toString()));
+    } else if (avroType.equals(Schema.Type.ARRAY)) {
+      writeArray((GroupType) type, nonNullAvroSchema, (Collection<?>) value);
+    } else if (avroType.equals(Schema.Type.MAP)) {
+      writeMap((GroupType) type, nonNullAvroSchema, (Map<CharSequence, ?>) value);
+    } else if (avroType.equals(Schema.Type.UNION)) {
+      writeUnion((GroupType) type, nonNullAvroSchema, value);
+    } else if (avroType.equals(Schema.Type.FIXED)) {
+      recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
+    }
+  }
+
+  private Binary fromAvroString(Object value) {
+    if (value instanceof Utf8) {
+      Utf8 utf8 = (Utf8) value;
+      return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength());
+    }
+    return Binary.fromString(value.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
new file mode 100644
index 0000000..2bd2599
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
@@ -0,0 +1,29 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+class SpecificDataSupplier implements AvroDataSupplier {
+  @Override
+  public GenericData get() {
+    return SpecificData.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
new file mode 100644
index 0000000..e5e0475
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
@@ -0,0 +1,171 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ * <p>
+ * Provides classes to store Avro data in Parquet files. Avro schemas are converted to
+ * parquet schemas as follows. Only record schemas are converted,
+ * other top-level schema types are not converted and attempting to do so will result
+ * in an error. Avro types are converted to Parquet types using the mapping shown here:
+ * </p>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Avro type</th>
+ *     <th>Parquet type</th>
+ *   </tr>
+ *   <tr>
+ *     <td>null</td>
+ *     <td>no type (the field is not encoded in Parquet), unless a null union</td>
+ *   </tr>
+ *   <tr>
+ *     <td>boolean</td>
+ *     <td>boolean</td>
+ *   </tr>
+ *   <tr>
+ *     <td>int</td>
+ *     <td>int32</td>
+ *   </tr>
+ *   <tr>
+ *     <td>long</td>
+ *     <td>int64</td>
+ *   </tr>
+ *   <tr>
+ *     <td>float</td>
+ *     <td>float</td>
+ *   </tr>
+ *   <tr>
+ *     <td>double</td>
+ *     <td>double</td>
+ *   </tr>
+ *   <tr>
+ *     <td>bytes</td>
+ *     <td>binary</td>
+ *   </tr>
+ *   <tr>
+ *     <td>string</td>
+ *     <td>binary (with original type UTF8)</td>
+ *   </tr>
+ *   <tr>
+ *     <td>record</td>
+ *     <td>group containing nested fields</td>
+ *   </tr>
+ *   <tr>
+ *     <td>enum</td>
+ *     <td>binary (with original type ENUM)</td>
+ *   </tr>
+ *   <tr>
+ *     <td>array</td>
+ *     <td>group (with original type LIST) containing one repeated group field</td>
+ *   </tr>
+ *   <tr>
+ *     <td>map</td>
+ *     <td>group (with original type MAP) containing one repeated group
+ *     field (with original type MAP_KEY_VALUE) of (key, value)</td>
+ *   </tr>
+ *   <tr>
+ *     <td>fixed</td>
+ *     <td>fixed_len_byte_array</td>
+ *   </tr>
+ *   <tr>
+ *     <td>union</td>
+ *     <td>an optional type, in the case of a null union, otherwise not supported</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>
+ * For Parquet files that were not written with classes from this package there is no
+ * Avro write schema stored in the Parquet file metadata. To read such files using
+ * classes from this package you must either provide an Avro read schema,
+ * or a default Avro schema will be derived using the following mapping.
+ * </p>
+ *
+ *   <tr>
+ *     <th>Parquet type</th>
+ *     <th>Avro type</th>
+ *   </tr>
+ *   <tr>
+ *     <td>boolean</td>
+ *     <td>boolean</td>
+ *   </tr>
+ *   <tr>
+ *     <td>int32</td>
+ *     <td>int</td>
+ *   </tr>
+ *   <tr>
+ *     <td>int64</td>
+ *     <td>long</td>
+ *   </tr>
+ *   <tr>
+ *     <td>int96</td>
+ *     <td>not supported</td>
+ *   </tr>
+ *   <tr>
+ *     <td>float</td>
+ *     <td>float</td>
+ *   </tr>
+ *   <tr>
+ *     <td>double</td>
+ *     <td>double</td>
+ *   </tr>
+ *   <tr>
+ *     <td>fixed_len_byte_array</td>
+ *     <td>fixed</td>
+ *   </tr>
+ *   <tr>
+ *     <td>binary (with no original type)</td>
+ *     <td>bytes</td>
+ *   </tr>
+ *   <tr>
+ *     <td>binary (with original type UTF8)</td>
+ *     <td>string</td>
+ *   </tr>
+ *   <tr>
+ *     <td>binary (with original type ENUM)</td>
+ *     <td>string</td>
+ *   </tr>
+ *   <tr>
+ *     <td>group (with original type LIST) containing one repeated group field</td>
+ *     <td>array</td>
+ *   </tr>
+ *   <tr>
+ *     <td>group (with original type MAP) containing one repeated group
+ *     field (with original type MAP_KEY_VALUE) of (key, value)</td>
+ *     <td>map</td>
+ *   </tr>
+ * </table>
+ *
+ * <p>
+ * Parquet fields that are optional are mapped to an Avro null union.
+ * </p>
+ *
+ * <p>
+ * Some conversions are lossy. Avro nulls are not represented in Parquet,
+ * so they are lost when converted back to Avro. Similarly, a Parquet enum does not
+ * store its values, so it cannot be converted back to an Avro enum,
+ * which is why an Avro string had to suffice. Type names for nested records, enums,
+ * and fixed types are lost in the conversion to Parquet.
+ * Avro aliases, default values, field ordering, and documentation strings are all
+ * dropped in the conversion to Parquet.
+ *
+ * Parquet maps can have any type for keys, but this is not true in Avro where map keys
+ * are assumed to be strings.
+ * </p>
+ */
+package org.apache.parquet.avro;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java b/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
deleted file mode 100644
index c6c04ab..0000000
--- a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import org.apache.avro.generic.GenericData;
-
-/**
- * Allows clients to control how the classes associated
- * with specific Avro records are managed and found, e.g.,
- * by creating an instance of {@code GenericData} that
- * uses a particular {@code ClassLoader}.
- */
-public interface AvroDataSupplier {
-  GenericData get();
-}


Mime
View raw message