parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [2/2] parquet-mr git commit: PARQUET-243: Add Avro reflect support
Date Mon, 18 May 2015 17:08:49 GMT
PARQUET-243: Add Avro reflect support

Author: Ryan Blue <blue@apache.org>

Closes #165 from rdblue/PARQUET-243-add-avro-reflect and squashes the following commits:

a1a17b4 [Ryan Blue] PARQUET-243: Update for Tom's review comments.
16584d1 [Ryan Blue] PARQUET-243: Fix AvroWriteSupport bug.
fa4a9ec [Ryan Blue] PARQUET-243: Add reflect tests.
4c50cd1 [Ryan Blue] PARQUET-243: Update write support for reflected objects.
b50c482 [Ryan Blue] PARQUET-243: Update tests to run with new converters.
0b7a333 [Ryan Blue] PARQUET-243: Use common AvroConverters where possible.
2f6825d [Ryan Blue] PARQUET-243: Add reflect converters that behave more like Avro.
98f10df [Ryan Blue] PARQUET-243: Add Avro compatible record materializer.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/a458e1a2
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/a458e1a2
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/a458e1a2

Branch: refs/heads/master
Commit: a458e1a2f3cd1ccd692f1530b64d3143c9beda51
Parents: 60edcf9
Author: Ryan Blue <blue@apache.org>
Authored: Mon May 18 10:08:32 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon May 18 10:08:32 2015 -0700

----------------------------------------------------------------------
 parquet-avro/pom.xml                            |   5 +
 .../avro/AvroCompatRecordMaterializer.java      |  46 ++
 .../org/apache/parquet/avro/AvroConverters.java | 253 ++++++
 .../avro/AvroIndexedRecordConverter.java        | 229 +----
 .../parquet/avro/AvroParquetOutputFormat.java   |  17 +-
 .../apache/parquet/avro/AvroParquetReader.java  |   5 +-
 .../apache/parquet/avro/AvroParquetWriter.java  | 141 +++-
 .../apache/parquet/avro/AvroReadSupport.java    |  54 +-
 .../parquet/avro/AvroRecordConverter.java       | 827 +++++++++++++++++++
 .../parquet/avro/AvroRecordMaterializer.java    |   7 +-
 .../apache/parquet/avro/AvroWriteSupport.java   | 236 +++++-
 .../parquet/avro/GenericDataSupplier.java       |  28 +
 .../parquet/avro/ParentValueContainer.java      |  63 ++
 .../parquet/avro/ReflectDataSupplier.java       |  29 +
 .../parquet/avro/TestBackwardCompatibility.java |  18 +-
 .../org/apache/parquet/avro/TestReadWrite.java  |  41 +-
 .../avro/TestReflectInputOutputFormat.java      | 495 +++++++++++
 .../parquet/avro/TestReflectReadWrite.java      | 215 +++++
 .../parquet/avro/TestSpecificReadWrite.java     |  45 +-
 parquet-column/pom.xml                          |   2 +-
 pom.xml                                         |   1 +
 21 files changed, 2482 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 6a5288b..0b84fa6 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -58,6 +58,11 @@
       <version>${avro.version}</version>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>${fastutil.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java
new file mode 100644
index 0000000..46059e8
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java
@@ -0,0 +1,46 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+class AvroCompatRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer<T> {
+
+  private AvroIndexedRecordConverter<T> root;
+
+  public AvroCompatRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
+                                      GenericData baseModel) {
+    this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
+  }
+
+  @Override
+  public T getCurrentRecord() {
+    return root.getCurrentRecord();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
new file mode 100644
index 0000000..f3cb1ec
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.nio.ByteBuffer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+public class AvroConverters {
+
+  public abstract static class AvroGroupConverter extends GroupConverter {
+    protected final ParentValueContainer parent;
+
+    public AvroGroupConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+  }
+
+  static class AvroPrimitiveConverter extends PrimitiveConverter {
+    protected final ParentValueContainer parent;
+
+    public AvroPrimitiveConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+  }
+
+  static final class FieldByteConverter extends AvroPrimitiveConverter {
+    public FieldByteConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.addByte((byte) value);
+    }
+  }
+  static final class FieldShortConverter extends AvroPrimitiveConverter {
+    public FieldShortConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.addShort((short) value);
+    }
+  }
+
+  static final class FieldCharConverter extends AvroPrimitiveConverter {
+    public FieldCharConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.addChar((char) value);
+    }
+  }
+
+  static final class FieldBooleanConverter extends AvroPrimitiveConverter {
+    public FieldBooleanConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addBoolean(boolean value) {
+      parent.addBoolean(value);
+    }
+  }
+
+  static final class FieldIntegerConverter extends AvroPrimitiveConverter {
+    public FieldIntegerConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.addInt(value);
+    }
+  }
+
+  static final class FieldLongConverter extends AvroPrimitiveConverter {
+    public FieldLongConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.addLong((long) value);
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.addLong(value);
+    }
+  }
+
+  static final class FieldFloatConverter extends AvroPrimitiveConverter {
+    public FieldFloatConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.addFloat((float) value);
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.addFloat((float) value);
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.addFloat(value);
+    }
+
+  }
+
+  static final class FieldDoubleConverter extends AvroPrimitiveConverter {
+    public FieldDoubleConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.addDouble((double) value);
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.addDouble((double) value);
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.addDouble((double) value);
+    }
+
+    @Override
+    final public void addDouble(double value) {
+      parent.addDouble(value);
+    }
+  }
+
+  static final class FieldByteArrayConverter extends AvroPrimitiveConverter {
+    public FieldByteArrayConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value.getBytes());
+    }
+  }
+
+  static final class FieldByteBufferConverter extends AvroPrimitiveConverter {
+    public FieldByteBufferConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(ByteBuffer.wrap(value.getBytes()));
+    }
+  }
+
+  static final class FieldStringConverter extends AvroPrimitiveConverter {
+    // TODO: dictionary support should be generic and provided by a parent
+    // TODO: this always produces strings, but should respect avro.java.string
+    private String[] dict;
+
+    public FieldStringConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value.toStringUsingUTF8());
+    }
+
+    @Override
+    public boolean hasDictionarySupport() {
+      return true;
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+      dict = new String[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
+      }
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      parent.add(dict[dictionaryId]);
+    }
+  }
+
+  static final class FieldEnumConverter extends AvroPrimitiveConverter {
+    private final Schema schema;
+    private final GenericData model;
+
+    public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
+                              GenericData model) {
+      super(parent);
+      this.schema = enumSchema;
+      this.model = model;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(model.createEnum(value.toStringUsingUTF8(), schema));
+    }
+  }
+
+  static final class FieldFixedConverter extends AvroPrimitiveConverter {
+    private final Schema schema;
+    private final GenericData model;
+
+    public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
+                               GenericData model) {
+      super(parent);
+      this.schema = avroSchema;
+      this.model = model;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(model.createFixed(null /* reuse */, value.getBytes(), schema));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index f76f367..262c423 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.avro;
 
 import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.avro.Schema;
@@ -28,7 +27,6 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificData;
 import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
@@ -36,9 +34,15 @@ import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.Type;
 
+/**
+ * This {@link Converter} class materializes records as Avro
+ * {@link IndexedRecord} instances. This is replaced by
+ * {@link AvroRecordConverter}, but is included for backward-compatibility.
+ *
+ * @param <T> a subclass of Avro's IndexedRecord
+ */
 class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter {
 
   private final ParentValueContainer parent;
@@ -71,8 +75,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     this.avroSchema = avroSchema;
     int schemaSize = parquetSchema.getFieldCount();
     this.converters = new Converter[schemaSize];
-    this.specificClass = baseModel instanceof SpecificData ?
-        ((SpecificData) baseModel).getClass(avroSchema) : null;
+    this.specificClass = getDatumClass(baseModel, avroSchema);
 
     this.model = this.specificClass == null ? GenericData.get() : baseModel;
 
@@ -88,7 +91,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
       final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
       converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() {
         @Override
-        void add(Object value) {
+        public void add(Object value) {
           AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
         }
       });
@@ -106,6 +109,14 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> getDatumClass(GenericData model, Schema schema) {
+    if (model instanceof SpecificData) {
+      return (Class<T>) ((SpecificData) model).getClass(schema);
+    }
+    return null;
+  }
+
   private Schema.Field getAvroField(String parquetFieldName) {
     Schema.Field avroField = avroSchema.getField(parquetFieldName);
     for (Schema.Field f : avroSchema.getFields()) {
@@ -123,19 +134,19 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
   private static Converter newConverter(Schema schema, Type type,
       GenericData model, ParentValueContainer parent) {
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
-      return new FieldBooleanConverter(parent);
+      return new AvroConverters.FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {
-      return new FieldIntegerConverter(parent);
+      return new AvroConverters.FieldIntegerConverter(parent);
     } else if (schema.getType().equals(Schema.Type.LONG)) {
-      return new FieldLongConverter(parent);
+      return new AvroConverters.FieldLongConverter(parent);
     } else if (schema.getType().equals(Schema.Type.FLOAT)) {
-      return new FieldFloatConverter(parent);
+      return new AvroConverters.FieldFloatConverter(parent);
     } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
-      return new FieldDoubleConverter(parent);
+      return new AvroConverters.FieldDoubleConverter(parent);
     } else if (schema.getType().equals(Schema.Type.BYTES)) {
-      return new FieldBytesConverter(parent);
+      return new AvroConverters.FieldByteBufferConverter(parent);
     } else if (schema.getType().equals(Schema.Type.STRING)) {
-      return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
+      return new AvroConverters.FieldStringConverter(parent);
     } else if (schema.getType().equals(Schema.Type.RECORD)) {
       return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
@@ -163,11 +174,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void start() {
     // Should do the right thing whether it is generic or specific
     this.currentRecord = (T) ((this.specificClass == null) ?
             new GenericData.Record(avroSchema) :
-            ((SpecificData) model).newInstance(specificClass, avroSchema));
+            SpecificData.newInstance(specificClass, avroSchema));
   }
 
   @Override
@@ -204,170 +216,6 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     return currentRecord;
   }
 
-  static abstract class ParentValueContainer {
-
-    /**
-     * Adds the value to the parent.
-     */
-    abstract void add(Object value);
-
-  }
-
-  static final class FieldBooleanConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldBooleanConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBoolean(boolean value) {
-      parent.add(value);
-    }
-
-  }
-
-  static final class FieldIntegerConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldIntegerConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(value);
-    }
-
-  }
-
-  static final class FieldLongConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldLongConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(Long.valueOf(value));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(value);
-    }
-
-  }
-
-  static final class FieldFloatConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldFloatConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(Float.valueOf(value));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(Float.valueOf(value));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(value);
-    }
-
-  }
-
-  static final class FieldDoubleConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldDoubleConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(Double.valueOf(value));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(Double.valueOf(value));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(Double.valueOf(value));
-    }
-
-    @Override
-    final public void addDouble(double value) {
-      parent.add(value);
-    }
-
-  }
-
-  static final class FieldBytesConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-
-    public FieldBytesConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(ByteBuffer.wrap(value.getBytes()));
-    }
-
-  }
-
-  static final class FieldStringConverter extends PrimitiveConverter {
-
-    private final ParentValueContainer parent;
-    private final boolean dictionarySupport;
-    private String[] dict;
-
-    public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
-      this.parent = parent;
-      this.dictionarySupport = dictionarySupport;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(value.toStringUsingUTF8());
-    }
-
-    @Override
-    public boolean hasDictionarySupport() {
-      return dictionarySupport;
-    }
-
-    @Override
-    public void setDictionary(Dictionary dictionary) {
-      dict = new String[dictionary.getMaxId() + 1];
-      for (int i = 0; i <= dictionary.getMaxId(); i++) {
-        dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
-      }
-    }
-
-    @Override
-    public void addValueFromDictionary(int dictionaryId) {
-      parent.add(dict[dictionaryId]);
-    }
-  }
-
   static final class FieldEnumConverter extends PrimitiveConverter {
 
     private final ParentValueContainer parent;
@@ -448,15 +296,13 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
    * </pre>
    *
    * This class also implements LIST element backward-compatibility rules.
-   *
-   * @param <T> The type of elements in the list
    */
-  static final class AvroArrayConverter<T> extends GroupConverter {
+  static final class AvroArrayConverter extends GroupConverter {
 
     private final ParentValueContainer parent;
     private final Schema avroSchema;
     private final Converter converter;
-    private GenericArray<T> array;
+    private GenericArray<Object> array;
 
     public AvroArrayConverter(ParentValueContainer parent, GroupType type,
         Schema avroSchema, GenericData model) {
@@ -471,8 +317,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
         converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
           @Override
           @SuppressWarnings("unchecked")
-          void add(Object value) {
-            array.add((T) value);
+          public void add(Object value) {
+            array.add(value);
           }
         });
       } else {
@@ -488,7 +334,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
 
     @Override
     public void start() {
-      array = new GenericData.Array<T>(0, avroSchema);
+      array = new GenericData.Array<Object>(0, avroSchema);
     }
 
     @Override
@@ -539,7 +385,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
      * </pre>
      */
     final class ElementConverter extends GroupConverter {
-      private T element;
+      private Object element;
       private final Converter elementConverter;
 
       public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
@@ -547,9 +393,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
         Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
         this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
           @Override
-          @SuppressWarnings("unchecked")
-          void add(Object value) {
-            ElementConverter.this.element = (T) value;
+          public void add(Object value) {
+            ElementConverter.this.element = value;
           }
         });
       }
@@ -573,7 +418,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     }
   }
 
-  static final class AvroUnionConverter<T> extends GroupConverter {
+  static final class AvroUnionConverter extends GroupConverter {
 
     private final ParentValueContainer parent;
     private final Converter[] memberConverters;
@@ -592,7 +437,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
           Type memberType = parquetGroup.getType(parquetIndex);
           memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() {
             @Override
-            void add(Object value) {
+            public void add(Object value) {
               Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type");
               memberValue = value;
             }
@@ -668,7 +513,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
         valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
           @Override
           @SuppressWarnings("unchecked")
-          void add(Object value) {
+          public void add(Object value) {
             MapKeyValueConverter.this.value = (V) value;
           }
         });

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
index afca74f..1eb4f93 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.util.ContextUtil;
 /**
  * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files.
  */
-public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> {
+public class AvroParquetOutputFormat<T> extends ParquetOutputFormat<T> {
 
   /**
    * Set the Avro schema to use for writing. The schema is translated into a Parquet
@@ -44,7 +44,20 @@ public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord>
   }
 
   public AvroParquetOutputFormat() {
-    super(new AvroWriteSupport());
+    super(new AvroWriteSupport<T>());
   }
 
+  /**
+   * Sets the {@link AvroDataSupplier} class that will be used. The data
+   * supplier provides instances of {@link org.apache.avro.generic.GenericData}
+   * that are used to deconstruct records.
+   *
+   * @param job a {@link Job} to configure
+   * @param supplierClass a supplier class
+   */
+  public static void setAvroDataSupplier(
+      Job job, Class<? extends AvroDataSupplier> supplierClass) {
+    AvroWriteSupport.setAvroDataSupplier(ContextUtil.getConfiguration(job),
+        supplierClass);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
index 40cf5eb..c4a010c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -20,7 +20,6 @@ package org.apache.parquet.avro;
 
 import java.io.IOException;
 
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -30,9 +29,9 @@ import org.apache.parquet.hadoop.ParquetReader;
 /**
  * Read Avro records from a Parquet file.
  */
-public class AvroParquetReader<T extends IndexedRecord> extends ParquetReader<T> {
+public class AvroParquetReader<T> extends ParquetReader<T> {
 
-  public static <T extends IndexedRecord> Builder<T> builder(Path file) {
+  public static <T> Builder<T> builder(Path file) {
     return ParquetReader.builder(new AvroReadSupport<T>(), file);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index afa2c6d..7abd39a 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -20,9 +20,11 @@ package org.apache.parquet.avro;
 
 import java.io.IOException;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -30,7 +32,11 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 /**
  * Write Avro records to a Parquet file.
  */
-public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T> {
+public class AvroParquetWriter<T> extends ParquetWriter<T> {
+
+  public static <T> Builder<T> builder(Path file) {
+    return new Builder<T>(file);
+  }
 
   /** Create a new {@link AvroParquetWriter}.
    *
@@ -41,10 +47,11 @@ public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T>
    * @param pageSize
    * @throws IOException
    */
+  @Deprecated
   public AvroParquetWriter(Path file, Schema avroSchema,
       CompressionCodecName compressionCodecName, int blockSize,
       int pageSize) throws IOException {
-    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+    super(file, AvroParquetWriter.<T>writeSupport(avroSchema, SpecificData.get()),
 	      compressionCodecName, blockSize, pageSize);
   }
 
@@ -58,10 +65,11 @@ public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T>
    * @param enableDictionary Whether to use a dictionary to compress columns.
    * @throws IOException
    */
+  @Deprecated
   public AvroParquetWriter(Path file, Schema avroSchema,
                            CompressionCodecName compressionCodecName, int blockSize,
                            int pageSize, boolean enableDictionary) throws IOException {
-    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+    super(file, AvroParquetWriter.<T>writeSupport(avroSchema, SpecificData.get()),
         compressionCodecName, blockSize, pageSize, enableDictionary,
         DEFAULT_IS_VALIDATING_ENABLED);
   }
@@ -73,9 +81,10 @@ public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T>
    * @param avroSchema The schema to write with.
    * @throws IOException
    */
+  @Deprecated
   public AvroParquetWriter(Path file, Schema avroSchema) throws IOException {
     this(file, avroSchema, CompressionCodecName.UNCOMPRESSED,
-	  DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+        DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
 
   /** Create a new {@link AvroParquetWriter}.
@@ -89,18 +98,126 @@ public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T>
    * @param conf The Configuration to use.
    * @throws IOException
    */
+  @Deprecated
   public AvroParquetWriter(Path file, Schema avroSchema,
                            CompressionCodecName compressionCodecName,
                            int blockSize, int pageSize, boolean enableDictionary,
                            Configuration conf) throws IOException {
-    super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
-        compressionCodecName, blockSize, pageSize, pageSize, enableDictionary,
-        DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION, conf);
+    this(file, AvroParquetWriter.<T>writeSupport(avroSchema, SpecificData.get()),
+        compressionCodecName, blockSize, pageSize,
+        enableDictionary, DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION,
+        conf);
+  }
+
+  /**
+   * Create a new {@link AvroParquetWriter}.
+   *
+   * @param file The file name to write to.
+   * @param writeSupport The schema to write with.
+   * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+   * @param blockSize the block size threshold.
+   * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+   * @param enableDictionary Whether to use a dictionary to compress columns.
+   * @param conf The Configuration to use.
+   * @throws IOException
+   */
+  AvroParquetWriter(Path file, WriteSupport<T> writeSupport,
+                           CompressionCodecName compressionCodecName,
+                           int blockSize, int pageSize, boolean enableDictionary,
+                           boolean enableValidation, WriterVersion writerVersion,
+                           Configuration conf)
+      throws IOException {
+    super(file, writeSupport, compressionCodecName, blockSize, pageSize,
+        pageSize, enableDictionary, enableValidation, writerVersion, conf);
+  }
+
+  private static <T> WriteSupport<T> writeSupport(Schema avroSchema,
+                                                  GenericData model) {
+    return new AvroWriteSupport<T>(
+        new AvroSchemaConverter().convert(avroSchema), avroSchema, model);
   }
 
-  @SuppressWarnings("unchecked")
-  private static <T> WriteSupport<T> writeSupport(Schema avroSchema) {
-    return (WriteSupport<T>) new AvroWriteSupport(
-        new AvroSchemaConverter().convert(avroSchema), avroSchema);
+  public static class Builder<T> {
+    private final Path file;
+    private Configuration conf = new Configuration();
+    private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
+    private int blockSize = DEFAULT_BLOCK_SIZE;
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED;
+    private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
+    private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
+
+    // avro-specific
+    private Schema schema = null;
+    private GenericData model = SpecificData.get();
+
+    private Builder(Path file) {
+      this.file = file;
+    }
+
+    public Builder<T> withConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder<T> withCompressionCodec(CompressionCodecName codecName) {
+      this.codecName = codecName;
+      return this;
+    }
+
+    public Builder<T> withBlockSize(int blockSize) {
+      this.blockSize = blockSize;
+      return this;
+    }
+
+    public Builder<T> withPageSize(int pageSize) {
+      this.pageSize = pageSize;
+      return this;
+    }
+
+    public Builder<T> enableDictionaryEncoding() {
+      this.enableDictionary = true;
+      return this;
+    }
+
+    public Builder<T> withDictionaryEncoding(boolean enableDictionary) {
+      this.enableDictionary = enableDictionary;
+      return this;
+    }
+
+    public Builder<T> enableValidation() {
+      this.enableValidation = true;
+      return this;
+    }
+
+    public Builder<T> withValidation(boolean enableValidation) {
+      this.enableValidation = enableValidation;
+      return this;
+    }
+
+    public Builder<T> withWriterVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return this;
+    }
+
+    public Builder<T> withSchema(Schema schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public Builder<T> withDataModel(GenericData model) {
+      this.model = model;
+      return this;
+    }
+
+    private WriteSupport<T> getWriteSupport() {
+      return AvroParquetWriter.<T>writeSupport(schema, model);
+    }
+
+    public ParquetWriter<T> build() throws IOException {
+      return new AvroParquetWriter<T>(file, getWriteSupport(), codecName,
+          blockSize, pageSize, enableDictionary, enableValidation,
+          writerVersion, conf);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index 9f1ba46..bf12cf8 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -21,7 +21,7 @@ package org.apache.parquet.avro;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.hadoop.api.ReadSupport;
@@ -29,11 +29,11 @@ import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 
 /**
- * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and
- * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using
- * this class directly.
+ * Avro implementation of {@link ReadSupport} for avro generic, specific, and
+ * reflect models. Use {@link AvroParquetReader} or
+ * {@link AvroParquetInputFormat} rather than using this class directly.
  */
-public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
+public class AvroReadSupport<T> extends ReadSupport<T> {
 
   public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
   private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
@@ -43,7 +43,10 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
   static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";
   private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
 
-  public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
+  public static final String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
+
+  public static final String AVRO_COMPATIBILITY = "parquet.avro.compatible";
+  public static final boolean AVRO_DEFAULT_COMPATIBILITY = true;
 
   /**
    * @see org.apache.parquet.avro.AvroParquetInputFormat#setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
@@ -69,18 +72,23 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
                           Map<String, String> keyValueMetaData,
                           MessageType fileSchema) {
     MessageType projection = fileSchema;
-    Map<String, String> metadata = null;
+    Map<String, String> metadata = new LinkedHashMap<String, String>();
 
     String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION);
     if (requestedProjectionString != null) {
       Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString);
       projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection);
     }
+
     String avroReadSchema = configuration.get(AVRO_READ_SCHEMA);
     if (avroReadSchema != null) {
-      metadata = new LinkedHashMap<String, String>();
       metadata.put(AVRO_READ_SCHEMA_METADATA_KEY, avroReadSchema);
     }
+
+    if (configuration.getBoolean(AVRO_COMPATIBILITY, AVRO_DEFAULT_COMPATIBILITY)) {
+      metadata.put(AVRO_COMPATIBILITY, "true");
+    }
+
     return new ReadContext(projection, metadata);
   }
 
@@ -88,10 +96,11 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
   public RecordMaterializer<T> prepareForRead(
       Configuration configuration, Map<String, String> keyValueMetaData,
       MessageType fileSchema, ReadContext readContext) {
+    Map<String, String> metadata = readContext.getReadSupportMetadata();
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema;
-    if (readContext.getReadSupportMetadata() != null &&
-        readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+
+    if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
       // use the Avro read schema provided by the user
       avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
     } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
@@ -104,10 +113,25 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
       // default to converting the Parquet schema into an Avro schema
       avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
     }
-    Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
-        SpecificDataSupplier.class,
-        AvroDataSupplier.class);
-    AvroDataSupplier supplier =ReflectionUtils.newInstance(suppClass, configuration);
-    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, supplier.get());
+
+    GenericData model = getDataModel(configuration);
+    String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY);
+    if (compatEnabled != null && Boolean.valueOf(compatEnabled)) {
+      return newCompatMaterializer(parquetSchema, avroSchema, model);
+    }
+    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, model);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> RecordMaterializer<T> newCompatMaterializer(
+      MessageType parquetSchema, Schema avroSchema, GenericData model) {
+    return (RecordMaterializer<T>) new AvroCompatRecordMaterializer(
+        parquetSchema, avroSchema, model);
+  }
+
+  private static GenericData getDataModel(Configuration conf) {
+    Class<? extends AvroDataSupplier> suppClass = conf.getClass(
+        AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
+    return ReflectionUtils.newInstance(suppClass, conf).get();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
new file mode 100644
index 0000000..8475825
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -0,0 +1,827 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import it.unimi.dsi.fastutil.bytes.ByteArrayList;
+import it.unimi.dsi.fastutil.chars.CharArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.shorts.ShortArrayList;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+/**
+ * This {@link Converter} class materializes records for a given
+ * {@link GenericData Avro data model}. This replaces
+ * {@link AvroIndexedRecordConverter} and works with generic, specific, and
+ * reflect records.
+ *
+ * @param <T> a subclass of Avro's IndexedRecord
+ */
+class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
+
+  protected T currentRecord;
+  private final Converter[] converters;
+
+  private final Schema avroSchema;
+
+  private final GenericData model;
+  private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();
+
+  public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema,
+                             GenericData baseModel) {
+    this(null, parquetSchema, avroSchema, baseModel);
+  }
+
+  public AvroRecordConverter(ParentValueContainer parent,
+                             GroupType parquetSchema, Schema avroSchema,
+                             GenericData model) {
+    super(parent);
+    this.avroSchema = avroSchema;
+    this.model = (model == null ? ReflectData.get() : model);
+    this.converters = new Converter[parquetSchema.getFieldCount()];
+
+    Map<String, Integer> avroFieldIndexes = new HashMap<String, Integer>();
+    int avroFieldIndex = 0;
+    for (Schema.Field field: avroSchema.getFields()) {
+        avroFieldIndexes.put(field.name(), avroFieldIndex++);
+    }
+
+    int parquetFieldIndex = 0;
+    for (Type parquetField: parquetSchema.getFields()) {
+      final Schema.Field avroField = getAvroField(parquetField.getName());
+      Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
+      final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
+      converters[parquetFieldIndex++] = newConverter(
+          nonNullSchema, parquetField, this.model, new ParentValueContainer() {
+        @Override
+        public void add(Object value) {
+          AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
+        }
+      });
+    }
+
+    // store defaults for any new Avro fields from avroSchema that are not in
+    // the writer schema (parquetSchema)
+    for (String fieldName : avroFieldIndexes.keySet()) {
+      Schema.Field field = avroSchema.getField(fieldName);
+      if (field.schema().getType() == Schema.Type.NULL) {
+        continue; // skip null since Parquet does not write nulls
+      }
+      if (field.defaultValue() == null || this.model.getDefaultValue(field) == null) {
+        continue; // field has no default
+      }
+      // use this.model because model may be null
+      recordDefaults.put(field, this.model.getDefaultValue(field));
+    }
+  }
+
+  private Schema.Field getAvroField(String parquetFieldName) {
+    Schema.Field avroField = avroSchema.getField(parquetFieldName);
+    if (avroField != null) {
+      return avroField;
+    }
+
+    for (Schema.Field f : avroSchema.getFields()) {
+      if (f.aliases().contains(parquetFieldName)) {
+        return f;
+      }
+    }
+
+    throw new InvalidRecordException(String.format(
+        "Parquet/Avro schema mismatch: Avro field '%s' not found",
+        parquetFieldName));
+  }
+
+  private static Converter newConverter(Schema schema, Type type,
+      GenericData model, ParentValueContainer parent) {
+    if (schema.getType().equals(Schema.Type.BOOLEAN)) {
+      return new AvroConverters.FieldBooleanConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.INT)) {
+      Class<?> datumClass = getDatumClass(schema, model);
+      if (datumClass == null) {
+        return new AvroConverters.FieldIntegerConverter(parent);
+      } else if (datumClass == byte.class || datumClass == Byte.class) {
+        return new AvroConverters.FieldByteConverter(parent);
+      } else if (datumClass == short.class || datumClass == Short.class) {
+        return new AvroConverters.FieldShortConverter(parent);
+      } else if (datumClass == char.class || datumClass == Character.class) {
+        return new AvroConverters.FieldCharConverter(parent);
+      }
+      return new AvroConverters.FieldIntegerConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.LONG)) {
+      return new AvroConverters.FieldLongConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.FLOAT)) {
+      return new AvroConverters.FieldFloatConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
+      return new AvroConverters.FieldDoubleConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.BYTES)) {
+      Class<?> datumClass = getDatumClass(schema, model);
+      if (datumClass == null) {
+        return new AvroConverters.FieldByteBufferConverter(parent);
+      } else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) {
+        return new AvroConverters.FieldByteArrayConverter(parent);
+      }
+      return new AvroConverters.FieldByteBufferConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.STRING)) {
+      return new AvroConverters.FieldStringConverter(parent);
+    } else if (schema.getType().equals(Schema.Type.RECORD)) {
+      return new AvroRecordConverter(parent, type.asGroupType(), schema, model);
+    } else if (schema.getType().equals(Schema.Type.ENUM)) {
+      return new AvroConverters.FieldEnumConverter(parent, schema, model);
+    } else if (schema.getType().equals(Schema.Type.ARRAY)) {
+      Class<?> datumClass = getDatumClass(schema, model);
+      if (datumClass != null && datumClass.isArray()) {
+        return new AvroArrayConverter(
+            parent, type.asGroupType(), schema, model, datumClass);
+      } else {
+        return new AvroCollectionConverter(
+            parent, type.asGroupType(), schema, model, datumClass);
+      }
+    } else if (schema.getType().equals(Schema.Type.MAP)) {
+      return new MapConverter(parent, type.asGroupType(), schema, model);
+    } else if (schema.getType().equals(Schema.Type.UNION)) {
+      return new AvroUnionConverter(parent, type, schema, model);
+    } else if (schema.getType().equals(Schema.Type.FIXED)) {
+      return new AvroConverters.FieldFixedConverter(parent, schema, model);
+    }
+    throw new UnsupportedOperationException(String.format(
+        "Cannot convert Avro type: %s to Parquet type: %s", schema, type));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+    if (model instanceof SpecificData) {
+      // this works for reflect as well
+      return ((SpecificData) model).getClass(schema);
+
+    } else if (model.getClass() == GenericData.class) {
+      return null;
+
+    } else {
+      // try to use reflection (for ThriftData and others)
+      Class<? extends GenericData> modelClass = model.getClass();
+      Method getClassMethod;
+      try {
+        getClassMethod = modelClass.getMethod("getClass", Schema.class);
+      } catch (NoSuchMethodException e) {
+        return null; // no getClass method
+      }
+
+      try {
+        return (Class<T>) getClassMethod.invoke(schema);
+      } catch (IllegalAccessException e) {
+        return null;
+      } catch (InvocationTargetException e) {
+        return null;
+      }
+    }
+  }
+
+  protected void set(String name, int avroIndex, Object value) {
+    model.setField(currentRecord, name, avroIndex, value);
+  }
+
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void start() {
+    this.currentRecord = (T) model.newRecord(null, avroSchema);
+  }
+
+  @Override
+  public void end() {
+    fillInDefaults();
+    if (parent != null) {
+      parent.add(currentRecord);
+    }
+  }
+
+  private void fillInDefaults() {
+    for (Map.Entry<Schema.Field, Object> entry : recordDefaults.entrySet()) {
+      Schema.Field f = entry.getKey();
+      // replace following with model.deepCopy once AVRO-1455 is being used
+      Object defaultValue = deepCopy(f.schema(), entry.getValue());
+      set(f.name(), f.pos(), defaultValue);
+    }
+  }
+
+  private Object deepCopy(Schema schema, Object value) {
+    switch (schema.getType()) {
+      case BOOLEAN:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return value;
+      default:
+        return model.deepCopy(schema, value);
+    }
+  }
+
+  T getCurrentRecord() {
+    return currentRecord;
+  }
+
+  /**
+   * Converter for a list to a Java Collection.
+   *
+   * <pre>
+   *   optional group the_list (LIST) { <-- this layer
+   *     repeated group array {
+   *       optional (type) element;
+   *     }
+   *   }
+   * </pre>
+   *
+   * This class also implements LIST element backward-compatibility rules.
+   */
+  static final class AvroCollectionConverter extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Schema avroSchema;
+    private final Converter converter;
+    private Class<?> containerClass;
+    private Collection<Object> container;
+
+    public AvroCollectionConverter(ParentValueContainer parent, GroupType type,
+                                   Schema avroSchema, GenericData model,
+                                   Class<?> containerClass) {
+      this.parent = parent;
+      this.avroSchema = avroSchema;
+      this.containerClass = containerClass;
+      Schema elementSchema = this.avroSchema.getElementType();
+      Type repeatedType = type.getType(0);
+      // always determine whether the repeated type is the element type by
+      // matching it against the element schema.
+      if (isElementType(repeatedType, elementSchema)) {
+        // the element type is the repeated type (and required)
+        converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public void add(Object value) {
+            container.add(value);
+          }
+        });
+      } else {
+        // the element is wrapped in a synthetic group and may be optional
+        converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
+      }
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return converter;
+    }
+
+    @Override
+    public void start() {
+      container = newContainer();
+    }
+
+    @Override
+    public void end() {
+      parent.add(container);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Collection<Object> newContainer() {
+      if (containerClass == null) {
+        return new GenericData.Array<Object>(0, avroSchema);
+      } else if (containerClass.isAssignableFrom(ArrayList.class)) {
+        return new ArrayList<Object>();
+      } else {
+        // not need to use the data model to instantiate because it resolved
+        // the class, which used the correct ClassLoader
+        return (Collection<Object>) ReflectData.newInstance(containerClass, avroSchema);
+      }
+    }
+
+    /**
+     * Converter for list elements.
+     *
+     * <pre>
+     *   optional group the_list (LIST) {
+     *     repeated group array { <-- this layer
+     *       optional (type) element;
+     *     }
+     *   }
+     * </pre>
+     */
+    final class ElementConverter extends GroupConverter {
+      private Object element;
+      private final Converter elementConverter;
+
+      public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
+        Type elementType = repeatedType.getType(0);
+        Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+        this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public void add(Object value) {
+            ElementConverter.this.element = value;
+          }
+        });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        Preconditions.checkArgument(
+            fieldIndex == 0, "Illegal field index: " + fieldIndex);
+        return elementConverter;
+      }
+
+      @Override
+      public void start() {
+        element = null;
+      }
+
+      @Override
+      public void end() {
+        container.add(element);
+      }
+    }
+  }
+
+  /**
+   * Converter for a list to a Java array.
+   *
+   * <pre>
+   *   optional group the_list (LIST) { <-- this layer
+   *     repeated group array {
+   *       optional (type) element;
+   *     }
+   *   }
+   * </pre>
+   *
+   * This class also implements LIST element backward-compatibility rules.
+   */
+  static final class AvroArrayConverter extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Schema avroSchema;
+    private final Converter converter;
+    private Class<?> elementClass;
+    private Collection<?> container;
+
+    public AvroArrayConverter(ParentValueContainer parent, GroupType type,
+                              Schema avroSchema, GenericData model,
+                              Class<?> arrayClass) {
+      this.parent = parent;
+      this.avroSchema = avroSchema;
+
+      Preconditions.checkArgument(arrayClass.isArray(),
+          "Cannot convert non-array: " + arrayClass.getName());
+      this.elementClass = arrayClass.getComponentType();
+
+      ParentValueContainer setter = createSetterAndContainer();
+      Schema elementSchema = this.avroSchema.getElementType();
+      Type repeatedType = type.getType(0);
+
+      // always determine whether the repeated type is the element type by
+      // matching it against the element schema.
+      if (isElementType(repeatedType, elementSchema)) {
+        // the element type is the repeated type (and required)
+        converter = newConverter(elementSchema, repeatedType, model, setter);
+      } else {
+        // the element is wrapped in a synthetic group and may be optional
+        converter = new PrimitiveElementConverter(
+            repeatedType.asGroupType(), elementSchema, model, setter);
+      }
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return converter;
+    }
+
+    @Override
+    public void start() {
+      // end creates a new copy of the array so the container is safe to reuse
+      container.clear();
+    }
+
+    @Override
+    public void end() {
+      if (elementClass == boolean.class) {
+        parent.add(((BooleanArrayList) container).toBooleanArray());
+      } else if (elementClass == byte.class) {
+        parent.add(((ByteArrayList) container).toByteArray());
+      } else if (elementClass == char.class) {
+        parent.add(((CharArrayList) container).toCharArray());
+      } else if (elementClass == short.class) {
+        parent.add(((ShortArrayList) container).toShortArray());
+      } else if (elementClass == int.class) {
+        parent.add(((IntArrayList) container).toIntArray());
+      } else if (elementClass == long.class) {
+        parent.add(((LongArrayList) container).toLongArray());
+      } else if (elementClass == float.class) {
+        parent.add(((FloatArrayList) container).toFloatArray());
+      } else if (elementClass == double.class) {
+        parent.add(((DoubleArrayList) container).toDoubleArray());
+      } else {
+        parent.add(((ArrayList) container).toArray());
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private ParentValueContainer createSetterAndContainer() {
+      if (elementClass == boolean.class) {
+        final BooleanArrayList list = new BooleanArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addBoolean(boolean value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == byte.class) {
+        final ByteArrayList list = new ByteArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addByte(byte value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == char.class) {
+        final CharArrayList list = new CharArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addChar(char value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == short.class) {
+        final ShortArrayList list = new ShortArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addShort(short value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == int.class) {
+        final IntArrayList list = new IntArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addInt(int value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == long.class) {
+        final LongArrayList list = new LongArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addLong(long value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == float.class) {
+        final FloatArrayList list = new FloatArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addFloat(float value) {
+            list.add(value);
+          }
+        };
+      } else if (elementClass == double.class) {
+        final DoubleArrayList list = new DoubleArrayList();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void addDouble(double value) {
+            list.add(value);
+          }
+        };
+      } else {
+        // this will end up as Object[]
+        final List<Object> list = new ArrayList<Object>();
+        this.container = list;
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            list.add(value);
+          }
+        };
+      }
+
+    }
+
+    /**
+     * Converter for primitive list elements.
+     *
+     * <pre>
+     *   optional group the_list (LIST) {
+     *     repeated group array { <-- this layer
+     *       optional (type) element;
+     *     }
+     *   }
+     * </pre>
+     */
+    final class PrimitiveElementConverter extends GroupConverter {
+      private boolean isSet;
+      private final Converter elementConverter;
+
+      public PrimitiveElementConverter(GroupType repeatedType,
+                                       Schema elementSchema, GenericData model,
+                                       final ParentValueContainer setter) {
+        Type elementType = repeatedType.getType(0);
+        Preconditions.checkArgument(
+            !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
+            "Cannot convert list of optional elements to primitive array");
+        Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+        this.elementConverter = newConverter(
+            nonNullElementSchema, elementType, model, new ParentValueContainer() {
+              @Override
+              public void add(Object value) {
+                isSet = true;
+                setter.add(value);
+              }
+
+              @Override
+              public void addByte(byte value) {
+                isSet = true;
+                setter.addByte(value);
+              }
+
+              @Override
+              public void addBoolean(boolean value) {
+                isSet = true;
+                setter.addBoolean(value);
+              }
+
+              @Override
+              public void addChar(char value) {
+                isSet = true;
+                setter.addChar(value);
+              }
+
+              @Override
+              public void addShort(short value) {
+                isSet = true;
+                setter.addShort(value);
+              }
+
+              @Override
+              public void addInt(int value) {
+                isSet = true;
+                setter.addInt(value);
+              }
+
+              @Override
+              public void addLong(long value) {
+                isSet = true;
+                setter.addLong(value);
+              }
+
+              @Override
+              public void addFloat(float value) {
+                isSet = true;
+                setter.addFloat(value);
+              }
+
+              @Override
+              public void addDouble(double value) {
+                isSet = true;
+                setter.addDouble(value);
+              }
+            });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        Preconditions.checkArgument(
+            fieldIndex == 0, "Illegal field index: " + fieldIndex);
+        return elementConverter;
+      }
+
+      @Override
+      public void start() {
+        isSet = false;
+      }
+
+      @Override
+      public void end() {
+        if (!isSet) {
+          container.add(null);
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns whether the given type is the element type of a list or is a
+   * synthetic group with one field that is the element type. This is
+   * determined by checking whether the type can be a synthetic group and by
+   * checking whether a potential synthetic group matches the expected schema.
+   * <p>
+   * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
+   * method never guesses because the expected schema is known.
+   *
+   * @param repeatedType a type that may be the element type
+   * @param elementSchema the expected Schema for list elements
+   * @return {@code true} if the repeatedType is the element schema
+   */
+  private static boolean isElementType(Type repeatedType, Schema elementSchema) {
+    if (repeatedType.isPrimitive() ||
+        repeatedType.asGroupType().getFieldCount() > 1) {
+      // The repeated type must be the element type because it is an invalid
+      // synthetic wrapper (must be a group with one field).
+      return true;
+    } else if (elementSchema != null &&
+        elementSchema.getType() == Schema.Type.RECORD &&
+        elementSchema.getFields().size() == 1 &&
+        elementSchema.getFields().get(0).name().equals(
+            repeatedType.asGroupType().getFieldName(0))) {
+      // The repeated type must be the element type because it matches the
+      // structure of the Avro element's schema.
+      return true;
+    }
+    return false;
+  }
+
+  static final class AvroUnionConverter extends AvroConverters.AvroGroupConverter {
+    private final Converter[] memberConverters;
+    private Object memberValue = null;
+
+    public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
+                              Schema avroSchema, GenericData model) {
+      super(parent);
+      GroupType parquetGroup = parquetSchema.asGroupType();
+      this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
+
+      int parquetIndex = 0;
+      for (int index = 0; index < avroSchema.getTypes().size(); index++) {
+        Schema memberSchema = avroSchema.getTypes().get(index);
+        if (!memberSchema.getType().equals(Schema.Type.NULL)) {
+          Type memberType = parquetGroup.getType(parquetIndex);
+          memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() {
+            @Override
+            public void add(Object value) {
+              Preconditions.checkArgument(
+                  AvroUnionConverter.this.memberValue == null,
+                  "Union is resolving to more than one type");
+              memberValue = value;
+            }
+          });
+          parquetIndex++; // Note for nulls the parquetIndex id not increased
+        }
+      }
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return memberConverters[fieldIndex];
+    }
+
+    @Override
+    public void start() {
+      memberValue = null;
+    }
+
+    @Override
+    public void end() {
+      parent.add(memberValue);
+    }
+  }
+
+  static final class MapConverter<V> extends GroupConverter {
+
+    private final ParentValueContainer parent;
+    private final Converter keyValueConverter;
+    private final Schema schema;
+    private final Class<?> mapClass;
+    private Map<String, V> map;
+
+    public MapConverter(ParentValueContainer parent, GroupType mapType,
+        Schema mapSchema, GenericData model) {
+      this.parent = parent;
+      GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
+      this.keyValueConverter = new MapKeyValueConverter(
+          repeatedKeyValueType, mapSchema, model);
+      this.schema = mapSchema;
+      this.mapClass = getDatumClass(mapSchema, model);
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return keyValueConverter;
+    }
+
+    @Override
+    public void start() {
+      this.map = newMap();
+    }
+
+    @Override
+    public void end() {
+      parent.add(map);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, V> newMap() {
+      if (mapClass == null || mapClass.isAssignableFrom(HashMap.class)) {
+        return new HashMap<String, V>();
+      } else {
+        return (Map<String, V>) ReflectData.newInstance(mapClass, schema);
+      }
+    }
+
+    final class MapKeyValueConverter extends GroupConverter {
+
+      private String key;
+      private V value;
+      private final Converter keyConverter;
+      private final Converter valueConverter;
+
+      public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
+          GenericData model) {
+        keyConverter = new PrimitiveConverter() {
+          @Override
+          final public void addBinary(Binary value) {
+            key = value.toStringUsingUTF8();
+          }
+        };
+
+        Type valueType = keyValueType.getType(1);
+        Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
+        valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public void add(Object value) {
+            MapKeyValueConverter.this.value = (V) value;
+          }
+        });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        if (fieldIndex == 0) {
+          return keyConverter;
+        } else if (fieldIndex == 1) {
+          return valueConverter;
+        }
+        throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex);
+      }
+
+      @Override
+      public void start() {
+        key = null;
+        value = null;
+      }
+
+      @Override
+      public void end() {
+        map.put(key, value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
index 1794929..5a5776f 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
@@ -20,18 +20,17 @@ package org.apache.parquet.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 
-class AvroRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer<T> {
+class AvroRecordMaterializer<T> extends RecordMaterializer<T> {
 
-  private AvroIndexedRecordConverter<T> root;
+  private AvroRecordConverter<T> root;
 
   public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
       GenericData baseModel) {
-    this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
+    this.root = new AvroRecordConverter<T>(requestedSchema, avroSchema, baseModel);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 2ec8ee1..991e956 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -35,13 +35,22 @@ import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.Preconditions;
 
 /**
- * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific.
- * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using
- * this class directly.
+ * Avro implementation of {@link WriteSupport} for generic, specific, and
+ * reflect models. Use {@link AvroParquetWriter} or
+ * {@link AvroParquetOutputFormat} rather than using this class directly.
  */
-public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
+public class AvroWriteSupport<T> extends WriteSupport<T> {
+
+  public static final String AVRO_DATA_SUPPLIER = "parquet.avro.write.data.supplier";
+
+  public static void setAvroDataSupplier(
+      Configuration configuration, Class<? extends AvroDataSupplier> suppClass) {
+    configuration.set(AVRO_DATA_SUPPLIER, suppClass.getName());
+  }
 
   static final String AVRO_SCHEMA = "parquet.avro.schema";
   private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
@@ -49,13 +58,26 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
   private RecordConsumer recordConsumer;
   private MessageType rootSchema;
   private Schema rootAvroSchema;
+  private GenericData model;
 
   public AvroWriteSupport() {
   }
 
+  /**
+   * @deprecated use {@link AvroWriteSupport(MessageType, Schema, Configuration)}
+   */
+  @Deprecated
   public AvroWriteSupport(MessageType schema, Schema avroSchema) {
     this.rootSchema = schema;
     this.rootAvroSchema = avroSchema;
+    this.model = null;
+  }
+
+  public AvroWriteSupport(MessageType schema, Schema avroSchema,
+                          GenericData model) {
+    this.rootSchema = schema;
+    this.rootAvroSchema = avroSchema;
+    this.model = model;
   }
 
   /**
@@ -68,8 +90,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
   @Override
   public WriteContext init(Configuration configuration) {
     if (rootAvroSchema == null) {
-      rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
-      rootSchema = new AvroSchemaConverter().convert(rootAvroSchema);
+      this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
+      this.rootSchema = new AvroSchemaConverter().convert(rootAvroSchema);
+    }
+    if (model == null) {
+      this.model = getDataModel(configuration);
     }
     Map<String, String> extraMetaData = new HashMap<String, String>();
     extraMetaData.put(AvroReadSupport.AVRO_SCHEMA_METADATA_KEY, rootAvroSchema.toString());
@@ -81,22 +106,30 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     this.recordConsumer = recordConsumer;
   }
 
-  @Override
+  // overloaded version for backward compatibility
+  @SuppressWarnings("unchecked")
   public void write(IndexedRecord record) {
     recordConsumer.startMessage();
     writeRecordFields(rootSchema, rootAvroSchema, record);
     recordConsumer.endMessage();
   }
 
+  @Override
+  public void write(T record) {
+    recordConsumer.startMessage();
+    writeRecordFields(rootSchema, rootAvroSchema, record);
+    recordConsumer.endMessage();
+  }
+
   private void writeRecord(GroupType schema, Schema avroSchema,
-                           IndexedRecord record) {
+                           Object record) {
     recordConsumer.startGroup();
     writeRecordFields(schema, avroSchema, record);
     recordConsumer.endGroup();
   }
 
   private void writeRecordFields(GroupType schema, Schema avroSchema,
-                                 IndexedRecord record) {
+                                 Object record) {
     List<Type> fields = schema.getFields();
     List<Schema.Field> avroFields = avroSchema.getFields();
     int index = 0; // parquet ignores Avro nulls, so index may differ
@@ -106,7 +139,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
         continue;
       }
       Type fieldType = fields.get(index);
-      Object value = record.get(avroIndex);
+      Object value = model.getField(record, avroField.name(), avroIndex);
       if (value != null) {
         recordConsumer.startField(fieldType.getName(), index);
         writeValue(fieldType, avroField.schema(), value);
@@ -118,17 +151,165 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     }
   }
 
-  private <T> void writeArray(GroupType schema, Schema avroSchema,
-                              Collection<T> array) {
+  private void writeArray(GroupType schema, Schema avroSchema, Object value) {
     recordConsumer.startGroup(); // group wrapper (original type LIST)
+    if (value instanceof Collection) {
+      writeCollection(schema, avroSchema, (Collection) value);
+    } else {
+      Class<?> arrayClass = value.getClass();
+      Preconditions.checkArgument(arrayClass.isArray(),
+          "Cannot write unless collection or array: " + arrayClass.getName());
+      writeJavaArray(schema, avroSchema, arrayClass, value);
+    }
+    recordConsumer.endGroup();
+  }
+
+  private void writeJavaArray(GroupType schema, Schema avroSchema,
+                              Class<?> arrayClass, Object value) {
+    Class<?> elementClass = arrayClass.getComponentType();
+
+    if (!elementClass.isPrimitive()) {
+      Object[] array = (Object[]) value;
+      if (array.length > 0) {
+        recordConsumer.startField("array", 0);
+        for (Object element : array) {
+          writeValue(schema.getType(0), avroSchema.getElementType(), element);
+        }
+        recordConsumer.endField("array", 0);
+      }
+      return;
+    }
+
+    switch (avroSchema.getElementType().getType()) {
+      case BOOLEAN:
+        Preconditions.checkArgument(elementClass == boolean.class,
+            "Cannot write as boolean array: " + arrayClass.getName());
+        writeBooleanArray((boolean[]) value);
+        break;
+      case INT:
+        if (elementClass == byte.class) {
+          writeByteArray((byte[]) value);
+        } else if (elementClass == char.class) {
+          writeCharArray((char[]) value);
+        } else if (elementClass == short.class) {
+          writeShortArray((short[]) value);
+        } else if (elementClass == int.class) {
+          writeIntArray((int[]) value);
+        } else {
+          throw new IllegalArgumentException(
+              "Cannot write as an int array: " + arrayClass.getName());
+        }
+        break;
+      case LONG:
+        Preconditions.checkArgument(elementClass == long.class,
+            "Cannot write as long array: " + arrayClass.getName());
+        writeLongArray((long[]) value);
+        break;
+      case FLOAT:
+        Preconditions.checkArgument(elementClass == float.class,
+            "Cannot write as float array: " + arrayClass.getName());
+        writeFloatArray((float[]) value);
+        break;
+      case DOUBLE:
+        Preconditions.checkArgument(elementClass == double.class,
+            "Cannot write as double array: " + arrayClass.getName());
+        writeDoubleArray((double[]) value);
+        break;
+      default:
+        throw new IllegalArgumentException("Cannot write " +
+            avroSchema.getElementType() + " array: " + arrayClass.getName());
+    }
+  }
+
+  private void writeCollection(GroupType schema, Schema avroSchema,
+                               Collection<?> array) {
     if (array.size() > 0) {
       recordConsumer.startField("array", 0);
-      for (T elt : array) {
+      for (Object elt : array) {
         writeValue(schema.getType(0), avroSchema.getElementType(), elt);
       }
       recordConsumer.endField("array", 0);
     }
-    recordConsumer.endGroup();
+  }
+
+  private void writeBooleanArray(boolean[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (boolean element : array) {
+        recordConsumer.addBoolean(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeByteArray(byte[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (byte element : array) {
+        recordConsumer.addInteger(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeShortArray(short[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (short element : array) {
+        recordConsumer.addInteger(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeCharArray(char[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (char element : array) {
+        recordConsumer.addInteger(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeIntArray(int[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (int element : array) {
+        recordConsumer.addInteger(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeLongArray(long[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (long element : array) {
+        recordConsumer.addLong(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeFloatArray(float[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (float element : array) {
+        recordConsumer.addFloat(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
+  }
+
+  private void writeDoubleArray(double[] array) {
+    if (array.length > 0) {
+      recordConsumer.startField("array", 0);
+      for (double element : array) {
+        recordConsumer.addDouble(element);
+      }
+      recordConsumer.endField("array", 0);
+    }
   }
 
   private <V> void writeMap(GroupType schema, Schema avroSchema,
@@ -168,7 +349,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
 
     // ResolveUnion will tell us which of the union member types to
     // deserialise.
-    int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
+    int avroIndex = model.resolveUnion(avroSchema, value);
 
     // For parquet's schema we skip nulls
     GroupType parquetGroup = parquetSchema.asGroupType();
@@ -197,7 +378,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     if (avroType.equals(Schema.Type.BOOLEAN)) {
       recordConsumer.addBoolean((Boolean) value);
     } else if (avroType.equals(Schema.Type.INT)) {
-      recordConsumer.addInteger(((Number) value).intValue());
+      if (value instanceof Character) {
+        recordConsumer.addInteger((Character) value);
+      } else {
+        recordConsumer.addInteger(((Number) value).intValue());
+      }
     } else if (avroType.equals(Schema.Type.LONG)) {
       recordConsumer.addLong(((Number) value).longValue());
     } else if (avroType.equals(Schema.Type.FLOAT)) {
@@ -205,19 +390,23 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     } else if (avroType.equals(Schema.Type.DOUBLE)) {
       recordConsumer.addDouble(((Number) value).doubleValue());
     } else if (avroType.equals(Schema.Type.BYTES)) {
-      recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+      if (value instanceof byte[]) {
+        recordConsumer.addBinary(Binary.fromByteArray((byte[]) value));
+      } else {
+        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+      }
     } else if (avroType.equals(Schema.Type.STRING)) {
       recordConsumer.addBinary(fromAvroString(value));
     } else if (avroType.equals(Schema.Type.RECORD)) {
-      writeRecord((GroupType) type, nonNullAvroSchema, (IndexedRecord) value);
+      writeRecord(type.asGroupType(), nonNullAvroSchema, value);
     } else if (avroType.equals(Schema.Type.ENUM)) {
       recordConsumer.addBinary(Binary.fromString(value.toString()));
     } else if (avroType.equals(Schema.Type.ARRAY)) {
-      writeArray((GroupType) type, nonNullAvroSchema, (Collection<?>) value);
+      writeArray(type.asGroupType(), nonNullAvroSchema, value);
     } else if (avroType.equals(Schema.Type.MAP)) {
-      writeMap((GroupType) type, nonNullAvroSchema, (Map<CharSequence, ?>) value);
+      writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
     } else if (avroType.equals(Schema.Type.UNION)) {
-      writeUnion((GroupType) type, nonNullAvroSchema, value);
+      writeUnion(type.asGroupType(), nonNullAvroSchema, value);
     } else if (avroType.equals(Schema.Type.FIXED)) {
       recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
     }
@@ -231,4 +420,9 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     return Binary.fromString(value.toString());
   }
 
+  private static GenericData getDataModel(Configuration conf) {
+    Class<? extends AvroDataSupplier> suppClass = conf.getClass(
+        AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
+    return ReflectionUtils.newInstance(suppClass, conf).get();
+  }
 }


Mime
View raw message