parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-285: Implement 3-level lists in Avro
Date Tue, 02 Jun 2015 00:46:44 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 4b5cda5a2 -> d6f082b9b


PARQUET-285: Implement 3-level lists in Avro

This includes the write-side the changes from #83 that implement the 3-level list structure for parquet-avro. The old commit was https://github.com/rdblue/parquet-mr/commit/3589a7367c829b9eabc36b2e2e1cab31685415eb.

Author: Ryan Blue <blue@apache.org>

Closes #198 from rdblue/PARQUET-285-avro-nested-lists and squashes the following commits:

3498571 [Ryan Blue] PARQUET-285: Fix review issues.
67ed2f4 [Ryan Blue] PARQUET-285: Add tests for new list write behavior.
6ec9120 [Ryan Blue] PARQUET-285: Implement nested type rules for Avro.
109111f [Ryan Blue] PARQUET-285: Add a better conversion pattern for lists.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/d6f082b9
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/d6f082b9
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/d6f082b9

Branch: refs/heads/master
Commit: d6f082b9be5d507ff60c6bc83a179cc44015ab97
Parents: 4b5cda5
Author: Ryan Blue <blue@apache.org>
Authored: Mon Jun 1 17:46:29 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Jun 1 17:46:29 2015 -0700

----------------------------------------------------------------------
 .../avro/AvroIndexedRecordConverter.java        |   3 +-
 .../apache/parquet/avro/AvroParquetWriter.java  |  12 +-
 .../parquet/avro/AvroRecordConverter.java       |   2 +-
 .../parquet/avro/AvroSchemaConverter.java       |  17 +-
 .../apache/parquet/avro/AvroWriteSupport.java   | 453 ++++++++------
 .../parquet/avro/TestAvroSchemaConverter.java   | 186 +++++-
 .../org/apache/parquet/avro/TestReadWrite.java  |  85 ++-
 .../parquet/avro/TestReadWriteOldBehavior.java  | 588 +++++++++++++++++++
 parquet-avro/src/test/resources/all.avsc        |   6 +
 .../src/test/resources/allFromParquet.avsc      |  85 ---
 .../resources/allFromParquetNewBehavior.avsc    |  91 +++
 .../resources/allFromParquetOldBehavior.avsc    | 100 ++++
 .../parquet/schema/ConversionPatterns.java      |  28 +
 13 files changed, 1359 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 262c423..a5e4141 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -308,7 +308,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
         Schema avroSchema, GenericData model) {
       this.parent = parent;
       this.avroSchema = avroSchema;
-      Schema elementSchema = this.avroSchema.getElementType();
+      Schema elementSchema = AvroSchemaConverter
+          .getNonNull(avroSchema.getElementType());
       Type repeatedType = type.getType(0);
       // always determine whether the repeated type is the element type by
       // matching it against the element schema.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
index 7abd39a..e719afc 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -103,7 +103,8 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> {
                            CompressionCodecName compressionCodecName,
                            int blockSize, int pageSize, boolean enableDictionary,
                            Configuration conf) throws IOException {
-    this(file, AvroParquetWriter.<T>writeSupport(avroSchema, SpecificData.get()),
+    this(file,
+        AvroParquetWriter.<T>writeSupport(conf, avroSchema, SpecificData.get()),
         compressionCodecName, blockSize, pageSize,
         enableDictionary, DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION,
         conf);
@@ -137,6 +138,13 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> {
         new AvroSchemaConverter().convert(avroSchema), avroSchema, model);
   }
 
+  private static <T> WriteSupport<T> writeSupport(Configuration conf,
+                                                  Schema avroSchema,
+                                                  GenericData model) {
+    return new AvroWriteSupport<T>(
+        new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
+  }
+
   public static class Builder<T> {
     private final Path file;
     private Configuration conf = new Configuration();
@@ -211,7 +219,7 @@ public class AvroParquetWriter<T> extends ParquetWriter<T> {
     }
 
     private WriteSupport<T> getWriteSupport() {
-      return AvroParquetWriter.<T>writeSupport(schema, model);
+      return AvroParquetWriter.<T>writeSupport(conf, schema, model);
     }
 
     public ParquetWriter<T> build() throws IOException {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 8475825..ed1b97e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -293,7 +293,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       this.parent = parent;
       this.avroSchema = avroSchema;
       this.containerClass = containerClass;
-      Schema elementSchema = this.avroSchema.getElementType();
+      Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType());
       Type repeatedType = type.getType(0);
       // always determine whether the repeated type is the element type by
       // matching it against the element schema.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 67924b5..8f7a1af 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -32,6 +32,8 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
 import static org.apache.parquet.schema.OriginalType.*;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 
@@ -43,19 +45,23 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
  */
 public class AvroSchemaConverter {
 
-  static final String ADD_LIST_ELEMENT_RECORDS =
+  public static final String ADD_LIST_ELEMENT_RECORDS =
       "parquet.avro.add-list-element-records";
   private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
 
   private final boolean assumeRepeatedIsListElement;
+  private final boolean writeOldListStructure;
 
   public AvroSchemaConverter() {
     this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
+    this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
   }
 
   public AvroSchemaConverter(Configuration conf) {
     this.assumeRepeatedIsListElement = conf.getBoolean(
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
+    this.writeOldListStructure = conf.getBoolean(
+        WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
   }
 
   /**
@@ -127,8 +133,13 @@ public class AvroSchemaConverter {
     } else if (type.equals(Schema.Type.ENUM)) {
       return primitive(fieldName, BINARY, repetition, ENUM);
     } else if (type.equals(Schema.Type.ARRAY)) {
-      return ConversionPatterns.listType(repetition, fieldName,
-          convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+      if (writeOldListStructure) {
+        return ConversionPatterns.listType(repetition, fieldName,
+            convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+      } else {
+        return ConversionPatterns.listOfElements(repetition, fieldName,
+            convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
+      }
     } else if (type.equals(Schema.Type.MAP)) {
       Type valType = convertField("value", schema.getValueType());
       // avro map key type is always string

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 991e956..e86c579 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -55,10 +55,22 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   static final String AVRO_SCHEMA = "parquet.avro.schema";
   private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
 
+  public static final String WRITE_OLD_LIST_STRUCTURE =
+      "parquet.avro.write-old-list-structure";
+  static final boolean WRITE_OLD_LIST_STRUCTURE_DEFAULT = true;
+
+  private static final String MAP_REPEATED_NAME = "key_value";
+  private static final String MAP_KEY_NAME = "key";
+  private static final String MAP_VALUE_NAME = "value";
+  private static final String LIST_REPEATED_NAME = "list";
+  private static final String OLD_LIST_REPEATED_NAME = "array";
+  static final String LIST_ELEMENT_NAME = "element";
+
   private RecordConsumer recordConsumer;
   private MessageType rootSchema;
   private Schema rootAvroSchema;
   private GenericData model;
+  private ListWriter listWriter;
 
   public AvroWriteSupport() {
   }
@@ -93,9 +105,19 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
       this.rootSchema = new AvroSchemaConverter().convert(rootAvroSchema);
     }
+
     if (model == null) {
       this.model = getDataModel(configuration);
     }
+
+    boolean writeOldListStructure = configuration.getBoolean(
+        WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
+    if (writeOldListStructure) {
+      this.listWriter = new TwoLevelListWriter();
+    } else {
+      this.listWriter = new ThreeLevelListWriter();
+    }
+
     Map<String, String> extraMetaData = new HashMap<String, String>();
     extraMetaData.put(AvroReadSupport.AVRO_SCHEMA_METADATA_KEY, rootAvroSchema.toString());
     return new WriteContext(rootSchema, extraMetaData);
@@ -151,167 +173,6 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     }
   }
 
-  private void writeArray(GroupType schema, Schema avroSchema, Object value) {
-    recordConsumer.startGroup(); // group wrapper (original type LIST)
-    if (value instanceof Collection) {
-      writeCollection(schema, avroSchema, (Collection) value);
-    } else {
-      Class<?> arrayClass = value.getClass();
-      Preconditions.checkArgument(arrayClass.isArray(),
-          "Cannot write unless collection or array: " + arrayClass.getName());
-      writeJavaArray(schema, avroSchema, arrayClass, value);
-    }
-    recordConsumer.endGroup();
-  }
-
-  private void writeJavaArray(GroupType schema, Schema avroSchema,
-                              Class<?> arrayClass, Object value) {
-    Class<?> elementClass = arrayClass.getComponentType();
-
-    if (!elementClass.isPrimitive()) {
-      Object[] array = (Object[]) value;
-      if (array.length > 0) {
-        recordConsumer.startField("array", 0);
-        for (Object element : array) {
-          writeValue(schema.getType(0), avroSchema.getElementType(), element);
-        }
-        recordConsumer.endField("array", 0);
-      }
-      return;
-    }
-
-    switch (avroSchema.getElementType().getType()) {
-      case BOOLEAN:
-        Preconditions.checkArgument(elementClass == boolean.class,
-            "Cannot write as boolean array: " + arrayClass.getName());
-        writeBooleanArray((boolean[]) value);
-        break;
-      case INT:
-        if (elementClass == byte.class) {
-          writeByteArray((byte[]) value);
-        } else if (elementClass == char.class) {
-          writeCharArray((char[]) value);
-        } else if (elementClass == short.class) {
-          writeShortArray((short[]) value);
-        } else if (elementClass == int.class) {
-          writeIntArray((int[]) value);
-        } else {
-          throw new IllegalArgumentException(
-              "Cannot write as an int array: " + arrayClass.getName());
-        }
-        break;
-      case LONG:
-        Preconditions.checkArgument(elementClass == long.class,
-            "Cannot write as long array: " + arrayClass.getName());
-        writeLongArray((long[]) value);
-        break;
-      case FLOAT:
-        Preconditions.checkArgument(elementClass == float.class,
-            "Cannot write as float array: " + arrayClass.getName());
-        writeFloatArray((float[]) value);
-        break;
-      case DOUBLE:
-        Preconditions.checkArgument(elementClass == double.class,
-            "Cannot write as double array: " + arrayClass.getName());
-        writeDoubleArray((double[]) value);
-        break;
-      default:
-        throw new IllegalArgumentException("Cannot write " +
-            avroSchema.getElementType() + " array: " + arrayClass.getName());
-    }
-  }
-
-  private void writeCollection(GroupType schema, Schema avroSchema,
-                               Collection<?> array) {
-    if (array.size() > 0) {
-      recordConsumer.startField("array", 0);
-      for (Object elt : array) {
-        writeValue(schema.getType(0), avroSchema.getElementType(), elt);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeBooleanArray(boolean[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (boolean element : array) {
-        recordConsumer.addBoolean(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeByteArray(byte[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (byte element : array) {
-        recordConsumer.addInteger(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeShortArray(short[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (short element : array) {
-        recordConsumer.addInteger(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeCharArray(char[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (char element : array) {
-        recordConsumer.addInteger(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeIntArray(int[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (int element : array) {
-        recordConsumer.addInteger(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeLongArray(long[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (long element : array) {
-        recordConsumer.addLong(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeFloatArray(float[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (float element : array) {
-        recordConsumer.addFloat(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
-  private void writeDoubleArray(double[] array) {
-    if (array.length > 0) {
-      recordConsumer.startField("array", 0);
-      for (double element : array) {
-        recordConsumer.addDouble(element);
-      }
-      recordConsumer.endField("array", 0);
-    }
-  }
-
   private <V> void writeMap(GroupType schema, Schema avroSchema,
                             Map<CharSequence, V> map) {
     GroupType innerGroup = schema.getType(0).asGroupType();
@@ -320,25 +181,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
 
     recordConsumer.startGroup(); // group wrapper (original type MAP)
     if (map.size() > 0) {
-      recordConsumer.startField("map", 0);
+      recordConsumer.startField(MAP_REPEATED_NAME, 0);
 
       for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
         recordConsumer.startGroup(); // repeated group key_value, middle layer
-        recordConsumer.startField("key", 0);
+        recordConsumer.startField(MAP_KEY_NAME, 0);
         writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
-        recordConsumer.endField("key", 0);
+        recordConsumer.endField(MAP_KEY_NAME, 0);
         V value = entry.getValue();
         if (value != null) {
-          recordConsumer.startField("value", 1);
+          recordConsumer.startField(MAP_VALUE_NAME, 1);
           writeValue(valueType, avroSchema.getValueType(), value);
-          recordConsumer.endField("value", 1);
+          recordConsumer.endField(MAP_VALUE_NAME, 1);
         } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) {
           throw new RuntimeException("Null map value for " + avroSchema.getName());
         }
         recordConsumer.endGroup();
       }
 
-      recordConsumer.endField("map", 0);
+      recordConsumer.endField(MAP_REPEATED_NAME, 0);
     }
     recordConsumer.endGroup();
   }
@@ -402,7 +263,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     } else if (avroType.equals(Schema.Type.ENUM)) {
       recordConsumer.addBinary(Binary.fromString(value.toString()));
     } else if (avroType.equals(Schema.Type.ARRAY)) {
-      writeArray(type.asGroupType(), nonNullAvroSchema, value);
+      listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value);
     } else if (avroType.equals(Schema.Type.MAP)) {
       writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
     } else if (avroType.equals(Schema.Type.UNION)) {
@@ -425,4 +286,260 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
     return ReflectionUtils.newInstance(suppClass, conf).get();
   }
+
+  private abstract class ListWriter {
+
+    protected abstract void writeCollection(
+        GroupType type, Schema schema, Collection<?> collection);
+
+    protected abstract void writeObjectArray(
+        GroupType type, Schema schema, Object[] array);
+
+    protected abstract void startArray();
+
+    protected abstract void endArray();
+
+    public void writeList(GroupType schema, Schema avroSchema, Object value) {
+      recordConsumer.startGroup(); // group wrapper (original type LIST)
+      if (value instanceof Collection) {
+        writeCollection(schema, avroSchema, (Collection) value);
+      } else {
+        Class<?> arrayClass = value.getClass();
+        Preconditions.checkArgument(arrayClass.isArray(),
+            "Cannot write unless collection or array: " + arrayClass.getName());
+        writeJavaArray(schema, avroSchema, arrayClass, value);
+      }
+      recordConsumer.endGroup();
+    }
+
+    public void writeJavaArray(GroupType schema, Schema avroSchema,
+                               Class<?> arrayClass, Object value) {
+      Class<?> elementClass = arrayClass.getComponentType();
+
+      if (!elementClass.isPrimitive()) {
+        writeObjectArray(schema, avroSchema, (Object[]) value);
+        return;
+      }
+
+      switch (avroSchema.getElementType().getType()) {
+        case BOOLEAN:
+          Preconditions.checkArgument(elementClass == boolean.class,
+              "Cannot write as boolean array: " + arrayClass.getName());
+          writeBooleanArray((boolean[]) value);
+          break;
+        case INT:
+          if (elementClass == byte.class) {
+            writeByteArray((byte[]) value);
+          } else if (elementClass == char.class) {
+            writeCharArray((char[]) value);
+          } else if (elementClass == short.class) {
+            writeShortArray((short[]) value);
+          } else if (elementClass == int.class) {
+            writeIntArray((int[]) value);
+          } else {
+            throw new IllegalArgumentException(
+                "Cannot write as an int array: " + arrayClass.getName());
+          }
+          break;
+        case LONG:
+          Preconditions.checkArgument(elementClass == long.class,
+              "Cannot write as long array: " + arrayClass.getName());
+          writeLongArray((long[]) value);
+          break;
+        case FLOAT:
+          Preconditions.checkArgument(elementClass == float.class,
+              "Cannot write as float array: " + arrayClass.getName());
+          writeFloatArray((float[]) value);
+          break;
+        case DOUBLE:
+          Preconditions.checkArgument(elementClass == double.class,
+              "Cannot write as double array: " + arrayClass.getName());
+          writeDoubleArray((double[]) value);
+          break;
+        default:
+          throw new IllegalArgumentException("Cannot write " +
+              avroSchema.getElementType() + " array: " + arrayClass.getName());
+      }
+    }
+
+    protected void writeBooleanArray(boolean[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (boolean element : array) {
+          recordConsumer.addBoolean(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeByteArray(byte[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (byte element : array) {
+          recordConsumer.addInteger(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeShortArray(short[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (short element : array) {
+          recordConsumer.addInteger(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeCharArray(char[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (char element : array) {
+          recordConsumer.addInteger(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeIntArray(int[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (int element : array) {
+          recordConsumer.addInteger(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeLongArray(long[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (long element : array) {
+          recordConsumer.addLong(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeFloatArray(float[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (float element : array) {
+          recordConsumer.addFloat(element);
+        }
+        endArray();
+      }
+    }
+
+    protected void writeDoubleArray(double[] array) {
+      if (array.length > 0) {
+        startArray();
+        for (double element : array) {
+          recordConsumer.addDouble(element);
+        }
+        endArray();
+      }
+    }
+  }
+
+  /**
+   * For backward-compatibility. This preserves how lists were written in 1.x.
+   */
+  private class TwoLevelListWriter extends ListWriter {
+    @Override
+    public void writeCollection(GroupType schema, Schema avroSchema,
+                                Collection<?> array) {
+      if (array.size() > 0) {
+        recordConsumer.startField(OLD_LIST_REPEATED_NAME, 0);
+        for (Object elt : array) {
+          writeValue(schema.getType(0), avroSchema.getElementType(), elt);
+        }
+        recordConsumer.endField(OLD_LIST_REPEATED_NAME, 0);
+      }
+    }
+
+    @Override
+    protected void writeObjectArray(GroupType type, Schema schema,
+                                    Object[] array) {
+      if (array.length > 0) {
+        recordConsumer.startField(OLD_LIST_REPEATED_NAME, 0);
+        for (Object element : array) {
+          writeValue(type.getType(0), schema.getElementType(), element);
+        }
+        recordConsumer.endField(OLD_LIST_REPEATED_NAME, 0);
+      }
+    }
+
+    @Override
+    protected void startArray() {
+      recordConsumer.startField(OLD_LIST_REPEATED_NAME, 0);
+    }
+
+    @Override
+    protected void endArray() {
+      recordConsumer.endField(OLD_LIST_REPEATED_NAME, 0);
+    }
+  }
+
+  private class ThreeLevelListWriter extends ListWriter {
+    @Override
+    protected void writeCollection(GroupType type, Schema schema, Collection<?> collection) {
+      if (collection.size() > 0) {
+        recordConsumer.startField(LIST_REPEATED_NAME, 0);
+        GroupType repeatedType = type.getType(0).asGroupType();
+        Type elementType = repeatedType.getType(0);
+        for (Object element : collection) {
+          recordConsumer.startGroup(); // repeated group array, middle layer
+          if (element != null) {
+            recordConsumer.startField(LIST_ELEMENT_NAME, 0);
+            writeValue(elementType, schema.getElementType(), element);
+            recordConsumer.endField(LIST_ELEMENT_NAME, 0);
+          } else if (!elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+            throw new RuntimeException(
+                "Null list element for " + schema.getName());
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField(LIST_REPEATED_NAME, 0);
+      }
+    }
+
+    @Override
+    protected void writeObjectArray(GroupType type, Schema schema,
+                                    Object[] array) {
+      if (array.length > 0) {
+        recordConsumer.startField(LIST_REPEATED_NAME, 0);
+        GroupType repeatedType = type.getType(0).asGroupType();
+        Type elementType = repeatedType.getType(0);
+        for (Object element : array) {
+          recordConsumer.startGroup(); // repeated group array, middle layer
+          if (element != null) {
+            recordConsumer.startField(LIST_ELEMENT_NAME, 0);
+            writeValue(elementType, schema.getElementType(), element);
+            recordConsumer.endField(LIST_ELEMENT_NAME, 0);
+          } else if (!elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+            throw new RuntimeException(
+                "Null list element for " + schema.getName());
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField(LIST_REPEATED_NAME, 0);
+      }
+    }
+
+    @Override
+    protected void startArray() {
+      recordConsumer.startField(LIST_REPEATED_NAME, 0);
+      recordConsumer.startGroup(); // repeated group array, middle layer
+      recordConsumer.startField(LIST_ELEMENT_NAME, 0);
+    }
+
+    @Override
+    protected void endArray() {
+      recordConsumer.endField(LIST_ELEMENT_NAME, 0);
+      recordConsumer.endGroup();
+      recordConsumer.endField(LIST_REPEATED_NAME, 0);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index f8de8c2..2d8bc7e 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -22,7 +22,9 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.util.Arrays;
 import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.node.NullNode;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -31,7 +33,15 @@ import static org.junit.Assert.assertEquals;
 
 public class TestAvroSchemaConverter {
 
-    public static final String ALL_PARQUET_SCHEMA =
+  private static final Configuration NEW_BEHAVIOR = new Configuration(false);
+
+  @BeforeClass
+  public static void setupConf() {
+    NEW_BEHAVIOR.setBoolean("parquet.avro.add-list-element-records", false);
+    NEW_BEHAVIOR.setBoolean("parquet.avro.write-old-list-structure", false);
+  }
+
+  public static final String ALL_PARQUET_SCHEMA =
       "message org.apache.parquet.avro.myrecord {\n" +
       "  required boolean myboolean;\n" +
       "  required int32 myint;\n" +
@@ -50,6 +60,11 @@ public class TestAvroSchemaConverter {
       "  optional group myoptionalarray (LIST) {\n" +
       "    repeated int32 array;\n" +
       "  }\n" +
+      "  required group myarrayofoptional (LIST) {\n" +
+      "    repeated group list {\n" +
+      "      optional int32 element;\n" +
+      "    }\n" +
+      "  }\n" +
       "  required group myrecordarray (LIST) {\n" +
       "    repeated group array {\n" +
       "      required int32 a;\n" +
@@ -65,27 +80,45 @@ public class TestAvroSchemaConverter {
       "  required fixed_len_byte_array(1) myfixed;\n" +
       "}\n";
 
-  private void testAvroToParquetConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+  private void testAvroToParquetConversion(
+      Schema avroSchema, String schemaString) throws Exception {
+    testAvroToParquetConversion(new Configuration(false), avroSchema, schemaString);
+  }
+
+  private void testAvroToParquetConversion(
+      Configuration conf, Schema avroSchema, String schemaString)
+      throws Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
     MessageType schema = avroSchemaConverter.convert(avroSchema);
     MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
     assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
         schema.toString());
   }
 
-  private void testParquetToAvroConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+  private void testParquetToAvroConversion(
+      Schema avroSchema, String schemaString) throws Exception {
+    testParquetToAvroConversion(new Configuration(false), avroSchema, schemaString);
+  }
+
+  private void testParquetToAvroConversion(
+      Configuration conf, Schema avroSchema, String schemaString)
+      throws Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
     Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType
         (schemaString));
     assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(),
         schema.toString());
   }
 
-  private void testRoundTripConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+  private void testRoundTripConversion(
+      Schema avroSchema, String schemaString) throws Exception {
+    testRoundTripConversion(new Configuration(), avroSchema, schemaString);
+  }
+
+  private void testRoundTripConversion(
+      Configuration conf, Schema avroSchema, String schemaString)
+      throws Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf);
     MessageType schema = avroSchemaConverter.convert(avroSchema);
     MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
     assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
@@ -105,6 +138,61 @@ public class TestAvroSchemaConverter {
     Schema schema = new Schema.Parser().parse(
         Resources.getResource("all.avsc").openStream());
     testAvroToParquetConversion(
+        NEW_BEHAVIOR, schema,
+        "message org.apache.parquet.avro.myrecord {\n" +
+            // Avro nulls are not encoded, unless they are null unions
+            "  required boolean myboolean;\n" +
+            "  required int32 myint;\n" +
+            "  required int64 mylong;\n" +
+            "  required float myfloat;\n" +
+            "  required double mydouble;\n" +
+            "  required binary mybytes;\n" +
+            "  required binary mystring (UTF8);\n" +
+            "  required group mynestedrecord {\n" +
+            "    required int32 mynestedint;\n" +
+            "  }\n" +
+            "  required binary myenum (ENUM);\n" +
+            "  required group myarray (LIST) {\n" +
+            "    repeated group list {\n" +
+            "      required int32 element;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required group myemptyarray (LIST) {\n" +
+            "    repeated group list {\n" +
+            "      required int32 element;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  optional group myoptionalarray (LIST) {\n" +
+            "    repeated group list {\n" +
+            "      required int32 element;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required group myarrayofoptional (LIST) {\n" +
+            "    repeated group list {\n" +
+            "      optional int32 element;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required group mymap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required binary key (UTF8);\n" +
+            "      required int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required group myemptymap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required binary key (UTF8);\n" +
+            "      required int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required fixed_len_byte_array(1) myfixed;\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testAllTypesOldListBehavior() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+    testAvroToParquetConversion(
         schema,
         "message org.apache.parquet.avro.myrecord {\n" +
             // Avro nulls are not encoded, unless they are null unions
@@ -128,6 +216,9 @@ public class TestAvroSchemaConverter {
             "  optional group myoptionalarray (LIST) {\n" +
             "    repeated int32 array;\n" +
             "  }\n" +
+            "  required group myarrayofoptional (LIST) {\n" +
+            "    repeated int32 array;\n" +
+            "  }\n" +
             "  required group mymap (MAP) {\n" +
             "    repeated group map (MAP_KEY_VALUE) {\n" +
             "      required binary key (UTF8);\n" +
@@ -147,7 +238,15 @@ public class TestAvroSchemaConverter {
   @Test
   public void testAllTypesParquetToAvro() throws Exception {
     Schema schema = new Schema.Parser().parse(
-        Resources.getResource("allFromParquet.avsc").openStream());
+        Resources.getResource("allFromParquetNewBehavior.avsc").openStream());
+    // Cannot use round-trip assertion because enum is lost
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema, ALL_PARQUET_SCHEMA);
+  }
+
+  @Test
+  public void testAllTypesParquetToAvroOldBehavior() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("allFromParquetOldBehavior.avsc").openStream());
     // Cannot use round-trip assertion because enum is lost
     testParquetToAvroConversion(schema, ALL_PARQUET_SCHEMA);
   }
@@ -155,14 +254,14 @@ public class TestAvroSchemaConverter {
   @Test(expected = IllegalArgumentException.class)
   public void testParquetMapWithNonStringKeyFails() throws Exception {
     MessageType parquetSchema = MessageTypeParser.parseMessageType(
-      "message myrecord {\n" +
-        "  required group mymap (MAP) {\n" +
-        "    repeated group map (MAP_KEY_VALUE) {\n" +
-        "      required int32 key;\n" +
-        "      required int32 value;\n" +
-        "    }\n" +
-        "  }\n" +
-        "}\n"
+        "message myrecord {\n" +
+            "  required group mymap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required int32 key;\n" +
+            "      required int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}\n"
     );
     new AvroSchemaConverter().convert(parquetSchema);
   }
@@ -201,10 +300,28 @@ public class TestAvroSchemaConverter {
   }
 
   @Test
+  public void testOptionalArrayElement() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT)));
+    schema.setFields(Arrays.asList(
+        new Schema.Field("myintarray", optionalIntArray, null, null)
+    ));
+    testRoundTripConversion(
+        NEW_BEHAVIOR, schema,
+        "message record1 {\n" +
+            "  required group myintarray (LIST) {\n" +
+            "    repeated group list {\n" +
+            "      optional int32 element;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}\n");
+  }
+
+  @Test
   public void testUnionOfTwoTypes() throws Exception {
     Schema schema = Schema.createRecord("record2", null, null, false);
     Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
-        .NULL),
+            .NULL),
         Schema.create(Schema.Type.INT),
         Schema.create(Schema.Type.FLOAT)));
     schema.setFields(Arrays.asList(
@@ -224,6 +341,33 @@ public class TestAvroSchemaConverter {
 
   @Test
   public void testArrayOfOptionalRecords() throws Exception {
+    Schema innerRecord = Schema.createRecord("element", null, null, false);
+    Schema optionalString = optional(Schema.create(Schema.Type.STRING));
+    innerRecord.setFields(Lists.newArrayList(
+        new Schema.Field("s1", optionalString, null, NullNode.getInstance()),
+        new Schema.Field("s2", optionalString, null, NullNode.getInstance())
+    ));
+    Schema schema = Schema.createRecord("HasArray", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
+            null, null)
+    ));
+    System.err.println("Avro schema: " + schema.toString(true));
+
+    testRoundTripConversion(NEW_BEHAVIOR, schema, "message HasArray {\n" +
+        "  required group myarray (LIST) {\n" +
+        "    repeated group list {\n" +
+        "      optional group element {\n" +
+        "        optional binary s1 (UTF8);\n" +
+        "        optional binary s2 (UTF8);\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }\n" +
+        "}\n");
+  }
+
+  @Test
+  public void testArrayOfOptionalRecordsOldBehavior() throws Exception {
     Schema innerRecord = Schema.createRecord("InnerRecord", null, null, false);
     Schema optionalString = optional(Schema.create(Schema.Type.STRING));
     innerRecord.setFields(Lists.newArrayList(
@@ -233,7 +377,7 @@ public class TestAvroSchemaConverter {
     Schema schema = Schema.createRecord("HasArray", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
-            null, NullNode.getInstance())
+            null, null)
     ));
     System.err.println("Avro schema: " + schema.toString(true));
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index b558343..bea0237 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -24,8 +24,12 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -34,15 +38,16 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.junit.Test;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.apache.parquet.avro.AvroTestUtil.optional;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -58,11 +63,13 @@ public class TestReadWrite {
   }
 
   private final boolean compat;
-  private final Configuration testConf = new Configuration(false);
+  private final Configuration testConf = new Configuration();
 
   public TestReadWrite(boolean compat) {
     this.compat = compat;
     this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+    testConf.setBoolean("parquet.avro.add-list-element-records", false);
+    testConf.setBoolean("parquet.avro.write-old-list-structure", false);
   }
 
   @Test
@@ -75,8 +82,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     // Write a record with an empty array.
     List<Integer> emptyArray = new ArrayList<Integer>();
@@ -102,8 +112,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer = 
-        new AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     // Write a record with an empty map.
     ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
@@ -129,8 +142,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     // Write a record with a null value
     Map<String, Integer> map = new HashMap<String, Integer>();
@@ -161,8 +177,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     // Write a record with a null value
     Map<String, Integer> map = new HashMap<String, Integer>();
@@ -185,8 +204,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer = 
-        new AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     // Write a record with a map with Utf8 keys.
     GenericData.Record record = new GenericRecordBuilder(schema)
@@ -212,8 +234,11 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
     
-    AvroParquetWriter<GenericRecord> writer = new
-        AvroParquetWriter<GenericRecord>(file, schema);
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(file)
+        .withSchema(schema)
+        .withConf(testConf)
+        .build();
 
     GenericData.Record nestedRecord = new GenericRecordBuilder(
         schema.getField("mynestedrecord").schema())
@@ -229,6 +254,13 @@ public class TestReadWrite {
     List<Integer> emptyArray = new ArrayList<Integer>();
     ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
 
+    Schema arrayOfOptionalIntegers = Schema.createArray(
+        optional(Schema.create(Schema.Type.INT)));
+    GenericData.Array<Integer> genericIntegerArrayWithNulls =
+        new GenericData.Array<Integer>(
+            arrayOfOptionalIntegers,
+            Arrays.asList(1, null, 2, null, 3));
+
     GenericData.Record record = new GenericRecordBuilder(schema)
         .set("mynull", null)
         .set("myboolean", true)
@@ -243,6 +275,7 @@ public class TestReadWrite {
         .set("myarray", genericIntegerArray)
         .set("myemptyarray", emptyArray)
         .set("myoptionalarray", genericIntegerArray)
+        .set("myarrayofoptional", genericIntegerArrayWithNulls)
         .set("mymap", ImmutableMap.of("a", 1, "b", 2))
         .set("myemptymap", emptyMap)
         .set("myfixed", genericFixed)
@@ -271,6 +304,7 @@ public class TestReadWrite {
     assertEquals(integerArray, nextRecord.get("myarray"));
     assertEquals(emptyArray, nextRecord.get("myemptyarray"));
     assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(genericIntegerArrayWithNulls, nextRecord.get("myarrayofoptional"));
     assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
     assertEquals(emptyMap, nextRecord.get("myemptymap"));
     assertEquals(genericFixed, nextRecord.get("myfixed"));
@@ -366,6 +400,22 @@ public class TestReadWrite {
         recordConsumer.endGroup();
         recordConsumer.endField("myoptionalarray", index++);
 
+        recordConsumer.startField("myarrayofoptional", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("list", 0);
+        for (Integer val : (Integer[]) record.get("myarrayofoptional")) {
+          recordConsumer.startGroup();
+          if (val != null) {
+            recordConsumer.startField("element", 0);
+            recordConsumer.addInteger(val);
+            recordConsumer.endField("element", 0);
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField("list", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarrayofoptional", index++);
+
         recordConsumer.startField("myrecordarray", index);
         recordConsumer.startGroup();
         recordConsumer.startField("array", 0);
@@ -424,6 +474,7 @@ public class TestReadWrite {
     record.put("mynestedint", 1);
     record.put("myarray", new int[] {1, 2, 3});
     record.put("myoptionalarray", new int[]{1, 2, 3});
+    record.put("myarrayofoptional", new Integer[] {1, null, 2, null, 3});
     record.put("myrecordarraya", new int[] {1, 2, 3});
     record.put("myrecordarrayb", new int[] {4, 5, 6});
     record.put("mymap", ImmutableMap.of("a", 1, "b", 2));
@@ -439,6 +490,7 @@ public class TestReadWrite {
         .set("mynestedint", 1).build();
 
     List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    List<Integer> ingeterArrayWithNulls = Arrays.asList(1, null, 2, null, 3);
 
     Schema recordArraySchema = Schema.createRecord("array", null, null, false);
     recordArraySchema.setFields(Arrays.asList(
@@ -470,6 +522,7 @@ public class TestReadWrite {
     assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
     assertEquals(integerArray, nextRecord.get("myarray"));
     assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(ingeterArrayWithNulls, nextRecord.get("myarrayofoptional"));
     assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
     assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
     assertEquals(genericFixed, nextRecord.get("myfixed"));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
new file mode 100644
index 0000000..5dd58f8
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
@@ -0,0 +1,588 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import sun.net.www.content.text.Generic;
+
+import static org.apache.parquet.avro.AvroTestUtil.array;
+import static org.apache.parquet.avro.AvroTestUtil.optional;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.primitive;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class TestReadWriteOldBehavior {
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { false },  // use the new converters
+        { true } }; // use the old converters
+    return Arrays.asList(data);
+  }
+
+  private final boolean compat;
+  private final Configuration testConf = new Configuration(false);
+
+  public TestReadWriteOldBehavior(boolean compat) {
+    this.compat = compat;
+    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+  }
+
+  @Test
+  public void testEmptyArray() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("array.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty array.
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("myarray", emptyArray).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyArray, nextRecord.get("myarray"));
+  }
+
+  @Test
+  public void testEmptyMap() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty map.
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", emptyMap).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyMap, nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testMapWithNulls() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map_with_nulls.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(map, nextRecord.get("mymap"));
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testMapRequiredValueWithNull() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+  }
+
+  @Test
+  public void testMapWithUtf8Key() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a map with Utf8 keys.
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2))
+        .build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testAll() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+    
+    AvroParquetWriter<GenericRecord> writer = new
+        AvroParquetWriter<GenericRecord>(file, schema);
+
+    GenericData.Record nestedRecord = new GenericRecordBuilder(
+        schema.getField("mynestedrecord").schema())
+            .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
+        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mynull", null)
+        .set("myboolean", true)
+        .set("myint", 1)
+        .set("mylong", 2L)
+        .set("myfloat", 3.1f)
+        .set("mydouble", 4.1)
+        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+        .set("mystring", "hello")
+        .set("mynestedrecord", nestedRecord)
+        .set("myenum", "a")
+        .set("myarray", genericIntegerArray)
+        .set("myemptyarray", emptyArray)
+        .set("myoptionalarray", genericIntegerArray)
+        .set("myarrayofoptional", genericIntegerArray)
+        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+        .set("myemptymap", emptyMap)
+        .set("myfixed", genericFixed)
+        .build();
+
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    Object expectedEnumSymbol = compat ? "a" :
+        new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
+
+    assertNotNull(nextRecord);
+    assertEquals(null, nextRecord.get("mynull"));
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals("hello", nextRecord.get("mystring"));
+    assertEquals(expectedEnumSymbol, nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(emptyArray, nextRecord.get("myemptyarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(integerArray, nextRecord.get("myarrayofoptional"));
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(emptyMap, nextRecord.get("myemptymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+  }
+
+  @Test
+  public void testArrayWithNullValues() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    GenericData.Record nestedRecord = new GenericRecordBuilder(
+        schema.getField("mynestedrecord").schema())
+        .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
+        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+
+    Schema arrayOfOptionalIntegers = Schema.createArray(
+        optional(Schema.create(Schema.Type.INT)));
+    GenericData.Array<Integer> genericIntegerArrayWithNulls =
+        new GenericData.Array<Integer>(
+            arrayOfOptionalIntegers,
+            Arrays.asList(1, null, 2, null, 3));
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mynull", null)
+        .set("myboolean", true)
+        .set("myint", 1)
+        .set("mylong", 2L)
+        .set("myfloat", 3.1f)
+        .set("mydouble", 4.1)
+        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+        .set("mystring", "hello")
+        .set("mynestedrecord", nestedRecord)
+        .set("myenum", "a")
+        .set("myarray", genericIntegerArray)
+        .set("myemptyarray", emptyArray)
+        .set("myoptionalarray", genericIntegerArray)
+        .set("myarrayofoptional", genericIntegerArrayWithNulls)
+        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+        .set("myemptymap", emptyMap)
+        .set("myfixed", genericFixed)
+        .build();
+
+    final AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    try {
+      writer.write(record);
+      fail("Should not succeed writing an array with null values");
+    } catch (Exception e) {
+      // expected
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  public void testAllUsingDefaultAvroSchema() throws Exception {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    // write file using Parquet APIs
+    ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file,
+        new WriteSupport<Map<String, Object>>() {
+
+      private RecordConsumer recordConsumer;
+
+      @Override
+      public WriteContext init(Configuration configuration) {
+        return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
+            new HashMap<String, String>());
+      }
+
+      @Override
+      public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+      }
+
+      @Override
+      public void write(Map<String, Object> record) {
+        recordConsumer.startMessage();
+
+        int index = 0;
+
+        recordConsumer.startField("myboolean", index);
+        recordConsumer.addBoolean((Boolean) record.get("myboolean"));
+        recordConsumer.endField("myboolean", index++);
+
+        recordConsumer.startField("myint", index);
+        recordConsumer.addInteger((Integer) record.get("myint"));
+        recordConsumer.endField("myint", index++);
+
+        recordConsumer.startField("mylong", index);
+        recordConsumer.addLong((Long) record.get("mylong"));
+        recordConsumer.endField("mylong", index++);
+
+        recordConsumer.startField("myfloat", index);
+        recordConsumer.addFloat((Float) record.get("myfloat"));
+        recordConsumer.endField("myfloat", index++);
+
+        recordConsumer.startField("mydouble", index);
+        recordConsumer.addDouble((Double) record.get("mydouble"));
+        recordConsumer.endField("mydouble", index++);
+
+        recordConsumer.startField("mybytes", index);
+        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+        recordConsumer.endField("mybytes", index++);
+
+        recordConsumer.startField("mystring", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("mystring")));
+        recordConsumer.endField("mystring", index++);
+
+        recordConsumer.startField("mynestedrecord", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("mynestedint", 0);
+        recordConsumer.addInteger((Integer) record.get("mynestedint"));
+        recordConsumer.endField("mynestedint", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mynestedrecord", index++);
+
+        recordConsumer.startField("myenum", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("myenum")));
+        recordConsumer.endField("myenum", index++);
+
+        recordConsumer.startField("myarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarray", index++);
+
+        recordConsumer.startField("myoptionalarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myoptionalarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myoptionalarray", index++);
+
+        recordConsumer.startField("myarrayofoptional", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("list", 0);
+        for (Integer val : (Integer[]) record.get("myarrayofoptional")) {
+          recordConsumer.startGroup();
+          if (val != null) {
+            recordConsumer.startField("element", 0);
+            recordConsumer.addInteger(val);
+            recordConsumer.endField("element", 0);
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField("list", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarrayofoptional", index++);
+
+        recordConsumer.startField("myrecordarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        recordConsumer.startGroup();
+        recordConsumer.startField("a", 0);
+        for (int val : (int[]) record.get("myrecordarraya")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("a", 0);
+        recordConsumer.startField("b", 1);
+        for (int val : (int[]) record.get("myrecordarrayb")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("b", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myrecordarray", index++);
+
+        recordConsumer.startField("mymap", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("map", 0);
+        recordConsumer.startGroup();
+        Map<String, Integer> mymap = (Map<String, Integer>) record.get("mymap");
+        recordConsumer.startField("key", 0);
+        for (String key : mymap.keySet()) {
+          recordConsumer.addBinary(Binary.fromString(key));
+        }
+        recordConsumer.endField("key", 0);
+        recordConsumer.startField("value", 1);
+        for (int val : mymap.values()) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("value", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("map", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mymap", index++);
+
+        recordConsumer.startField("myfixed", index);
+        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+        recordConsumer.endField("myfixed", index++);
+
+        recordConsumer.endMessage();
+      }
+    });
+    Map<String, Object> record = new HashMap<String, Object>();
+    record.put("myboolean", true);
+    record.put("myint", 1);
+    record.put("mylong", 2L);
+    record.put("myfloat", 3.1f);
+    record.put("mydouble", 4.1);
+    record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)));
+    record.put("mystring", "hello");
+    record.put("myenum", "a");
+    record.put("mynestedint", 1);
+    record.put("myarray", new int[] {1, 2, 3});
+    record.put("myoptionalarray", new int[]{1, 2, 3});
+    record.put("myarrayofoptional", new Integer[] {1, null, 2, null, 3});
+    record.put("myrecordarraya", new int[] {1, 2, 3});
+    record.put("myrecordarrayb", new int[] {4, 5, 6});
+    record.put("mymap", ImmutableMap.of("a", 1, "b", 2));
+    record.put("myfixed", new byte[] { (byte) 65 });
+    parquetWriter.write(record);
+    parquetWriter.close();
+
+    Schema nestedRecordSchema = Schema.createRecord("mynestedrecord", null, null, false);
+    nestedRecordSchema.setFields(Arrays.asList(
+        new Schema.Field("mynestedint", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericData.Record nestedRecord = new GenericRecordBuilder(nestedRecordSchema)
+        .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+
+    Schema recordArraySchema = Schema.createRecord("array", null, null, false);
+    recordArraySchema.setFields(Arrays.asList(
+        new Schema.Field("a", Schema.create(Schema.Type.INT), null, null),
+        new Schema.Field("b", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericRecordBuilder builder = new GenericRecordBuilder(recordArraySchema);
+    List<GenericData.Record> recordArray = new ArrayList<GenericData.Record>();
+    recordArray.add(builder.set("a", 1).set("b", 4).build());
+    recordArray.add(builder.set("a", 2).set("b", 5).build());
+    recordArray.add(builder.set("a", 3).set("b", 6).build());
+    GenericData.Array<GenericData.Record> genericRecordArray = new GenericData.Array<GenericData.Record>(
+        Schema.createArray(recordArraySchema), recordArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    // 3-level lists are deserialized with the extra layer present
+    Schema elementSchema = record("list",
+        optionalField("element", primitive(Schema.Type.INT)));
+    GenericRecordBuilder elementBuilder = new GenericRecordBuilder(elementSchema);
+    GenericData.Array<GenericData.Record> genericRecordArrayWithNullIntegers =
+        new GenericData.Array<GenericData.Record>(array(elementSchema),
+            Arrays.asList(
+                elementBuilder.set("element", 1).build(),
+                elementBuilder.set("element", null).build(),
+                elementBuilder.set("element", 2).build(),
+                elementBuilder.set("element", null).build(),
+                elementBuilder.set("element", 3).build()
+            ));
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+    assertNotNull(nextRecord);
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals("hello", nextRecord.get("mystring"));
+    assertEquals("a", nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(genericRecordArrayWithNullIntegers, nextRecord.get("myarrayofoptional"));
+    assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/resources/all.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/all.avsc b/parquet-avro/src/test/resources/all.avsc
index 59fd91c..2bf3b16 100644
--- a/parquet-avro/src/test/resources/all.avsc
+++ b/parquet-avro/src/test/resources/all.avsc
@@ -64,6 +64,12 @@
        "items" : "int"
     }]
   }, {
+    "name" : "myarrayofoptional",
+    "type" : {
+       "type" : "array",
+       "items" : [ "null", "int" ]
+    }
+  }, {
     "name" : "mymap",
     "type" : {
       "type" : "map",

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/resources/allFromParquet.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/allFromParquet.avsc b/parquet-avro/src/test/resources/allFromParquet.avsc
deleted file mode 100644
index 6f6f97f..0000000
--- a/parquet-avro/src/test/resources/allFromParquet.avsc
+++ /dev/null
@@ -1,85 +0,0 @@
-{
-  "name" : "myrecord",
-  "namespace": "org.apache.parquet.avro",
-  "type" : "record",
-  "fields" : [ {
-    "name" : "myboolean",
-    "type" : "boolean"
-  }, {
-    "name" : "myint",
-    "type" : "int"
-  }, {
-    "name" : "mylong",
-    "type" : "long"
-  }, {
-    "name" : "myfloat",
-    "type" : "float"
-  }, {
-    "name" : "mydouble",
-    "type" : "double"
-  }, {
-    "name" : "mybytes",
-    "type" : "bytes"
-  }, {
-    "name" : "mystring",
-    "type" : "string"
-  }, {
-    "name" : "mynestedrecord",
-    "type" : {
-      "type" : "record",
-      "name" : "mynestedrecord",
-      "namespace" : "",
-      "fields" : [ {
-        "name" : "mynestedint",
-        "type" : "int"
-      } ]
-    }
-  }, {
-    "name" : "myenum",
-    "type" : "string"
-  }, {
-    "name" : "myarray",
-    "type" : {
-      "type" : "array",
-      "items" : "int"
-    }
-  }, {
-    "name" : "myoptionalarray",
-    "type" : [ "null", {
-       "type" : "array",
-       "items" : "int"
-    }],
-    "default" : null
-  }, {
-    "name" : "myrecordarray",
-    "type" : {
-      "type" : "array",
-      "items" : {
-        "type" : "record",
-        "name" : "array",
-        "namespace" : "",
-        "fields" : [ {
-           "name" : "a",
-           "type" : "int"
-        }, {
-           "name" : "b",
-           "type" : "int"
-        } ]
-      }
-    }
-  }, {
-    "name" : "mymap",
-    "type" : {
-      "type" : "map",
-      "values" : "int"
-    }
-  }, {
-    "name" : "myfixed",
-    "type" : {
-      "type" : "fixed",
-      "name" : "myfixed",
-      "namespace" : "",
-      "size" : 1
-    }
-  } ]
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/resources/allFromParquetNewBehavior.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/allFromParquetNewBehavior.avsc b/parquet-avro/src/test/resources/allFromParquetNewBehavior.avsc
new file mode 100644
index 0000000..65b30a6
--- /dev/null
+++ b/parquet-avro/src/test/resources/allFromParquetNewBehavior.avsc
@@ -0,0 +1,91 @@
+{
+  "name" : "myrecord",
+  "namespace": "org.apache.parquet.avro",
+  "type" : "record",
+  "fields" : [ {
+    "name" : "myboolean",
+    "type" : "boolean"
+  }, {
+    "name" : "myint",
+    "type" : "int"
+  }, {
+    "name" : "mylong",
+    "type" : "long"
+  }, {
+    "name" : "myfloat",
+    "type" : "float"
+  }, {
+    "name" : "mydouble",
+    "type" : "double"
+  }, {
+    "name" : "mybytes",
+    "type" : "bytes"
+  }, {
+    "name" : "mystring",
+    "type" : "string"
+  }, {
+    "name" : "mynestedrecord",
+    "type" : {
+      "type" : "record",
+      "name" : "mynestedrecord",
+      "namespace" : "",
+      "fields" : [ {
+        "name" : "mynestedint",
+        "type" : "int"
+      } ]
+    }
+  }, {
+    "name" : "myenum",
+    "type" : "string"
+  }, {
+    "name" : "myarray",
+    "type" : {
+      "type" : "array",
+      "items" : "int"
+    }
+  }, {
+    "name" : "myoptionalarray",
+    "type" : [ "null", {
+       "type" : "array",
+       "items" : "int"
+    }],
+    "default" : null
+  }, {
+    "name" : "myarrayofoptional",
+    "type" : {
+       "type" : "array",
+       "items" : ["null", "int"]
+    }
+  }, {
+    "name" : "myrecordarray",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "array",
+        "namespace" : "",
+        "fields" : [ {
+           "name" : "a",
+           "type" : "int"
+        }, {
+           "name" : "b",
+           "type" : "int"
+        } ]
+      }
+    }
+  }, {
+    "name" : "mymap",
+    "type" : {
+      "type" : "map",
+      "values" : "int"
+    }
+  }, {
+    "name" : "myfixed",
+    "type" : {
+      "type" : "fixed",
+      "name" : "myfixed",
+      "namespace" : "",
+      "size" : 1
+    }
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-avro/src/test/resources/allFromParquetOldBehavior.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/resources/allFromParquetOldBehavior.avsc b/parquet-avro/src/test/resources/allFromParquetOldBehavior.avsc
new file mode 100644
index 0000000..d8a3202
--- /dev/null
+++ b/parquet-avro/src/test/resources/allFromParquetOldBehavior.avsc
@@ -0,0 +1,100 @@
+{
+  "name" : "myrecord",
+  "namespace": "org.apache.parquet.avro",
+  "type" : "record",
+  "fields" : [ {
+    "name" : "myboolean",
+    "type" : "boolean"
+  }, {
+    "name" : "myint",
+    "type" : "int"
+  }, {
+    "name" : "mylong",
+    "type" : "long"
+  }, {
+    "name" : "myfloat",
+    "type" : "float"
+  }, {
+    "name" : "mydouble",
+    "type" : "double"
+  }, {
+    "name" : "mybytes",
+    "type" : "bytes"
+  }, {
+    "name" : "mystring",
+    "type" : "string"
+  }, {
+    "name" : "mynestedrecord",
+    "type" : {
+      "type" : "record",
+      "name" : "mynestedrecord",
+      "namespace" : "",
+      "fields" : [ {
+        "name" : "mynestedint",
+        "type" : "int"
+      } ]
+    }
+  }, {
+    "name" : "myenum",
+    "type" : "string"
+  }, {
+    "name" : "myarray",
+    "type" : {
+      "type" : "array",
+      "items" : "int"
+    }
+  }, {
+    "name" : "myoptionalarray",
+    "type" : [ "null", {
+       "type" : "array",
+       "items" : "int"
+    }],
+    "default" : null
+  }, {
+    "name" : "myarrayofoptional",
+    "type" : {
+       "type" : "array",
+       "items" : {
+          "type": "record",
+          "name": "list",
+          "namespace": "",
+          "fields": [ {
+            "name": "element",
+            "type": ["null", "int"],
+            "default": null
+          } ]
+       }
+    }
+  }, {
+    "name" : "myrecordarray",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "array",
+        "namespace" : "",
+        "fields" : [ {
+           "name" : "a",
+           "type" : "int"
+        }, {
+           "name" : "b",
+           "type" : "int"
+        } ]
+      }
+    }
+  }, {
+    "name" : "mymap",
+    "type" : {
+      "type" : "map",
+      "values" : "int"
+    }
+  }, {
+    "name" : "myfixed",
+    "type" : {
+      "type" : "fixed",
+      "name" : "myfixed",
+      "namespace" : "",
+      "size" : 1
+    }
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d6f082b9/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
index 0f68fc3..64534e1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.schema;
 
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.Repetition;
 
@@ -28,6 +29,9 @@ import static org.apache.parquet.schema.OriginalType.*;
  * to equivalent Parquet types.
  */
 public abstract class ConversionPatterns {
+
+  private static final String ELEMENT_NAME = "element";
+
   /**
    * to preserve the difference between empty list and null when optional
    *
@@ -92,7 +96,9 @@ public abstract class ConversionPatterns {
    * @param alias      name of the field
    * @param nestedType
    * @return
+   * @deprecated use listOfElements instead
    */
+  @Deprecated
   public static GroupType listType(Repetition repetition, String alias, Type nestedType) {
     return listWrapper(
             repetition,
@@ -101,4 +107,26 @@ public abstract class ConversionPatterns {
             nestedType
     );
   }
+
+  /**
+   * Creates a 3-level list structure annotated with LIST with elements of the
+   * given elementType. The repeated level is inserted automatically and the
+   * elementType's repetition should be the correct repetition of the elements,
+   * required for non-null and optional for nullable.
+   *
+   * @param listRepetition the repetition of the entire list structure
+   * @param name the name of the list structure type
+   * @param elementType the type of elements contained by the list
+   * @return a GroupType that represents the list
+   */
+  public static GroupType listOfElements(Repetition listRepetition, String name, Type elementType) {
+    Preconditions.checkArgument(elementType.getName().equals(ELEMENT_NAME),
+        "List element type must be named 'element'");
+    return listWrapper(
+        listRepetition,
+        name,
+        LIST,
+        new GroupType(Repetition.REPEATED, "list", elementType)
+    );
+  }
 }


Mime
View raw message