parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [2/2] parquet-mr git commit: PARQUET-286: Update String support to match upstream Avro.
Date Thu, 04 Jun 2015 17:46:18 GMT
PARQUET-286: Update String support to match upstream Avro.

This adds getStringableClass, which determines what String
representation upstream Avro would use. Specific and reflect will use an
alternative String class if java-class is set that is instantiated using
a constructor that takes a String. Otherwise, reflect will always use
String and both specific and generic will use Utf8 or String depending
on whether avro.java.string is set to "string".

The new string representations required two new converters: one for Utf8
and one for stringable classes (those with constructors that take a
single String). The converters have also been refactored so that all
binary converters now implement dictionary support.

Author: Ryan Blue <blue@apache.org>

Closes #201 from rdblue/PARQUET-286-avro-utf8-support and squashes the following commits:

beb5a44 [Ryan Blue] PARQUET-286: Add tests, support for stringable map keys.
0e9240f [Ryan Blue] PARQUET-286: Update string support to match upstream Avro.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/918609f2
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/918609f2
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/918609f2

Branch: refs/heads/master
Commit: 918609f2cc4e4de95445ce4fdd7dc952b9625017
Parents: d6f082b
Author: Ryan Blue <blue@apache.org>
Authored: Thu Jun 4 10:45:50 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Thu Jun 4 10:45:50 2015 -0700

----------------------------------------------------------------------
 parquet-avro/pom.xml                            |  16 +
 .../org/apache/parquet/avro/AvroConverters.java | 118 +++-
 .../parquet/avro/AvroRecordConverter.java       | 112 +++-
 parquet-avro/src/test/avro/stringBehavior.avsc  |  35 ++
 .../parquet/avro/TestBackwardCompatibility.java |   3 +-
 .../org/apache/parquet/avro/TestReadWrite.java  |  27 +-
 .../parquet/avro/TestReadWriteOldBehavior.java  | 588 ------------------
 .../avro/TestReadWriteOldListBehavior.java      | 594 +++++++++++++++++++
 .../parquet/avro/TestReflectReadWrite.java      |  21 +-
 .../apache/parquet/avro/TestStringBehavior.java | 363 ++++++++++++
 10 files changed, 1221 insertions(+), 656 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 535bb85..ea4c37a 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -84,6 +84,14 @@
   </dependencies>
 
   <build>
+    <resources>
+      <resource>
+        <directory>src/test/avro</directory>
+      </resource>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+    </resources>
     <plugins>
       <plugin>
         <artifactId>maven-enforcer-plugin</artifactId>
@@ -97,7 +105,15 @@
         <artifactId>avro-maven-plugin</artifactId>
         <version>${avro.version}</version>
         <executions>
+          <execution>
+            <id>compile-avsc</id>
+            <phase>generate-test-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+          </execution>
             <execution>
+                <id>compile-idl</id>
                 <phase>generate-test-sources</phase>
                 <goals>
                     <goal>idl-protocol</goal>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index f3cb1ec..817f074 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -18,10 +18,14 @@
  */
 package org.apache.parquet.avro;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
@@ -44,6 +48,40 @@ public class AvroConverters {
     }
   }
 
+  abstract static class BinaryConverter<T> extends AvroPrimitiveConverter {
+    private T[] dict = null;
+
+    public BinaryConverter(ParentValueContainer parent) {
+      super(parent);
+    }
+
+    public abstract T convert(Binary binary);
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.add(convert(value));
+    }
+
+    @Override
+    public boolean hasDictionarySupport() {
+      return true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setDictionary(Dictionary dictionary) {
+      dict = (T[]) new Object[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        dict[i] = convert(dictionary.decodeToBinary(i));
+      }
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      parent.add(dict[dictionaryId]);
+    }
+  }
+
   static final class FieldByteConverter extends AvroPrimitiveConverter {
     public FieldByteConverter(ParentValueContainer parent) {
       super(parent);
@@ -54,6 +92,7 @@ public class AvroConverters {
       parent.addByte((byte) value);
     }
   }
+
   static final class FieldShortConverter extends AvroPrimitiveConverter {
     public FieldShortConverter(ParentValueContainer parent) {
       super(parent);
@@ -133,7 +172,6 @@ public class AvroConverters {
     final public void addFloat(float value) {
       parent.addFloat(value);
     }
-
   }
 
   static final class FieldDoubleConverter extends AvroPrimitiveConverter {
@@ -162,62 +200,84 @@ public class AvroConverters {
     }
   }
 
-  static final class FieldByteArrayConverter extends AvroPrimitiveConverter {
+  static final class FieldByteArrayConverter extends BinaryConverter<byte[]> {
     public FieldByteArrayConverter(ParentValueContainer parent) {
       super(parent);
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(value.getBytes());
+    public byte[] convert(Binary binary) {
+      return binary.getBytes();
     }
   }
 
-  static final class FieldByteBufferConverter extends AvroPrimitiveConverter {
+  static final class FieldByteBufferConverter extends BinaryConverter<ByteBuffer> {
     public FieldByteBufferConverter(ParentValueContainer parent) {
       super(parent);
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(ByteBuffer.wrap(value.getBytes()));
+    public ByteBuffer convert(Binary binary) {
+      return ByteBuffer.wrap(binary.getBytes());
     }
   }
 
-  static final class FieldStringConverter extends AvroPrimitiveConverter {
-    // TODO: dictionary support should be generic and provided by a parent
-    // TODO: this always produces strings, but should respect avro.java.string
-    private String[] dict;
-
+  static final class FieldStringConverter extends BinaryConverter<String> {
     public FieldStringConverter(ParentValueContainer parent) {
       super(parent);
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(value.toStringUsingUTF8());
+    public String convert(Binary binary) {
+      return binary.toStringUsingUTF8();
     }
+  }
 
-    @Override
-    public boolean hasDictionarySupport() {
-      return true;
+  static final class FieldUTF8Converter extends BinaryConverter<Utf8> {
+    public FieldUTF8Converter(ParentValueContainer parent) {
+      super(parent);
     }
 
     @Override
-    public void setDictionary(Dictionary dictionary) {
-      dict = new String[dictionary.getMaxId() + 1];
-      for (int i = 0; i <= dictionary.getMaxId(); i++) {
-        dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
+    public Utf8 convert(Binary binary) {
+      return new Utf8(binary.getBytes());
+    }
+  }
+
+  static final class FieldStringableConverter extends BinaryConverter<Object> {
+    private final String stringableName;
+    private final Constructor<?> ctor;
+
+    public FieldStringableConverter(ParentValueContainer parent,
+                                    Class<?> stringableClass) {
+      super(parent);
+      stringableName = stringableClass.getName();
+      try {
+        this.ctor = stringableClass.getConstructor(String.class);
+      } catch (NoSuchMethodException e) {
+        throw new ParquetDecodingException(
+            "Unable to get String constructor for " + stringableName, e);
       }
     }
 
     @Override
-    public void addValueFromDictionary(int dictionaryId) {
-      parent.add(dict[dictionaryId]);
+    public Object convert(Binary binary) {
+      try {
+        return ctor.newInstance(binary.toStringUsingUTF8());
+      } catch (InstantiationException e) {
+        throw new ParquetDecodingException(
+            "Cannot convert binary to " + stringableName, e);
+      } catch (IllegalAccessException e) {
+        throw new ParquetDecodingException(
+            "Cannot convert binary to " + stringableName, e);
+      } catch (InvocationTargetException e) {
+        throw new ParquetDecodingException(
+            "Cannot convert binary to " + stringableName, e);
+      }
     }
   }
 
-  static final class FieldEnumConverter extends AvroPrimitiveConverter {
+  static final class FieldEnumConverter extends BinaryConverter<Object> {
     private final Schema schema;
     private final GenericData model;
 
@@ -229,12 +289,12 @@ public class AvroConverters {
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(model.createEnum(value.toStringUsingUTF8(), schema));
+    public Object convert(Binary binary) {
+      return model.createEnum(binary.toStringUsingUTF8(), schema);
     }
   }
 
-  static final class FieldFixedConverter extends AvroPrimitiveConverter {
+  static final class FieldFixedConverter extends BinaryConverter<Object> {
     private final Schema schema;
     private final GenericData model;
 
@@ -246,8 +306,8 @@ public class AvroConverters {
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(model.createFixed(null /* reuse */, value.getBytes(), schema));
+    public Object convert(Binary binary) {
+      return model.createFixed(null /* reuse */, binary.getBytes(), schema);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index ed1b97e..57ad18a 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.shorts.ShortArrayList;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -36,13 +37,15 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.Stringable;
 import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.ClassUtils;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.avro.AvroConverters.FieldStringConverter;
+import org.apache.parquet.avro.AvroConverters.FieldStringableConverter;
 import org.apache.parquet.io.InvalidRecordException;
-import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -59,6 +62,10 @@ import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
  */
 class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
 
+  private static final String STRINGABLE_PROP = "avro.java.string";
+  private static final String JAVA_CLASS_PROP = "java-class";
+  private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
+
   protected T currentRecord;
   private final Converter[] converters;
 
@@ -86,18 +93,40 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
         avroFieldIndexes.put(field.name(), avroFieldIndex++);
     }
 
+    Class<?> recordClass = null;
+    if (model instanceof ReflectData) {
+      recordClass = getDatumClass(avroSchema, model);
+    }
+
     int parquetFieldIndex = 0;
     for (Type parquetField: parquetSchema.getFields()) {
       final Schema.Field avroField = getAvroField(parquetField.getName());
       Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
       final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
-      converters[parquetFieldIndex++] = newConverter(
-          nonNullSchema, parquetField, this.model, new ParentValueContainer() {
+      ParentValueContainer container = new ParentValueContainer() {
         @Override
         public void add(Object value) {
           AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
         }
-      });
+      };
+      converters[parquetFieldIndex] = newConverter(
+          nonNullSchema, parquetField, this.model, container);
+
+      // @Stringable doesn't affect the reflected schema; must be enforced here
+      if (recordClass != null &&
+          converters[parquetFieldIndex] instanceof FieldStringConverter) {
+        try {
+          Field field = recordClass.getDeclaredField(avroField.name());
+          if (field.isAnnotationPresent(Stringable.class)) {
+            converters[parquetFieldIndex] = new FieldStringableConverter(
+                container, field.getType());
+          }
+        } catch (NoSuchFieldException e) {
+          // must not be stringable
+        }
+      }
+
+      parquetFieldIndex += 1;
     }
 
     // store defaults for any new Avro fields from avroSchema that are not in
@@ -163,7 +192,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       }
       return new AvroConverters.FieldByteBufferConverter(parent);
     } else if (schema.getType().equals(Schema.Type.STRING)) {
-      return new AvroConverters.FieldStringConverter(parent);
+      return newStringConverter(schema, model, parent);
     } else if (schema.getType().equals(Schema.Type.RECORD)) {
       return new AvroRecordConverter(parent, type.asGroupType(), schema, model);
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
@@ -188,6 +217,51 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
         "Cannot convert Avro type: %s to Parquet type: %s", schema, type));
   }
 
+  private static Converter newStringConverter(Schema schema, GenericData model,
+                                              ParentValueContainer parent) {
+    Class<?> stringableClass = getStringableClass(schema, model);
+    if (stringableClass == String.class) {
+      return new FieldStringConverter(parent);
+    } else if (stringableClass == CharSequence.class) {
+      return new AvroConverters.FieldUTF8Converter(parent);
+    }
+    return new FieldStringableConverter(parent, stringableClass);
+  }
+
+  private static Class<?> getStringableClass(Schema schema, GenericData model) {
+    if (model instanceof SpecificData) {
+      // both specific and reflect (and any subclasses) use this logic
+      boolean isMap = (schema.getType() == Schema.Type.MAP);
+      String stringableClass = schema.getProp(
+          isMap ? JAVA_KEY_CLASS_PROP : JAVA_CLASS_PROP);
+      if (stringableClass != null) {
+        try {
+          return ClassUtils.forName(model.getClassLoader(), stringableClass);
+        } catch (ClassNotFoundException e) {
+          // not available, use a String instead
+        }
+      }
+    }
+
+    if (ReflectData.class.isAssignableFrom(model.getClass())) {
+      // reflect uses String, not Utf8
+      return String.class;
+    }
+
+    // generic and specific use the avro.java.string setting
+    String name = schema.getProp(STRINGABLE_PROP);
+    if (name == null) {
+      return CharSequence.class;
+    }
+
+    switch (GenericData.StringType.valueOf(name)) {
+      case String:
+        return String.class;
+      default:
+        return CharSequence.class; // will use Utf8
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
     if (model instanceof SpecificData) {
@@ -733,13 +807,13 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
-  static final class MapConverter<V> extends GroupConverter {
+  static final class MapConverter<K, V> extends GroupConverter {
 
     private final ParentValueContainer parent;
     private final Converter keyValueConverter;
     private final Schema schema;
     private final Class<?> mapClass;
-    private Map<String, V> map;
+    private Map<K, V> map;
 
     public MapConverter(ParentValueContainer parent, GroupType mapType,
         Schema mapSchema, GenericData model) {
@@ -767,29 +841,31 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
 
     @SuppressWarnings("unchecked")
-    private Map<String, V> newMap() {
+    private Map<K, V> newMap() {
       if (mapClass == null || mapClass.isAssignableFrom(HashMap.class)) {
-        return new HashMap<String, V>();
+        return new HashMap<K, V>();
       } else {
-        return (Map<String, V>) ReflectData.newInstance(mapClass, schema);
+        return (Map<K, V>) ReflectData.newInstance(mapClass, schema);
       }
     }
 
     final class MapKeyValueConverter extends GroupConverter {
 
-      private String key;
+      private K key;
       private V value;
       private final Converter keyConverter;
       private final Converter valueConverter;
 
       public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
           GenericData model) {
-        keyConverter = new PrimitiveConverter() {
-          @Override
-          final public void addBinary(Binary value) {
-            key = value.toStringUsingUTF8();
-          }
-        };
+        keyConverter = newStringConverter(mapSchema, model,
+            new ParentValueContainer() {
+              @Override
+              @SuppressWarnings("unchecked")
+              public void add(Object value) {
+                MapKeyValueConverter.this.key = (K) value;
+              }
+            });
 
         Type valueType = keyValueType.getType(1);
         Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/avro/stringBehavior.avsc
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/avro/stringBehavior.avsc b/parquet-avro/src/test/avro/stringBehavior.avsc
new file mode 100644
index 0000000..7787b59
--- /dev/null
+++ b/parquet-avro/src/test/avro/stringBehavior.avsc
@@ -0,0 +1,35 @@
+{
+  "name" : "StringBehaviorTest",
+  "namespace": "org.apache.parquet.avro",
+  "type" : "record",
+  "fields" : [ {
+    "name" : "default_class",
+    "type" : "string"
+  }, {
+    "name" : "string_class",
+    "type" : {"type": "string", "avro.java.string": "String"}
+  }, {
+    "name" : "stringable_class",
+    "type" : {"type": "string", "java-class": "java.math.BigDecimal"}
+  }, {
+    "name" : "default_map",
+    "type" : {
+      "type" : "map",
+      "values" : "int"
+    }
+  }, {
+    "name" : "string_map",
+    "type" : {
+      "type" : "map",
+      "values" : "int",
+      "avro.java.string": "String"
+    }
+  }, {
+    "name" : "stringable_map",
+    "type" : {
+      "type" : "map",
+      "values" : "int",
+      "java-key-class": "java.math.BigDecimal"
+    }
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
index d907bd4..b8f34af 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
@@ -21,6 +21,7 @@ package org.apache.parquet.avro;
 import com.google.common.io.Resources;
 import java.io.IOException;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
@@ -60,7 +61,7 @@ public class TestBackwardCompatibility {
     GenericRecord r;
     while ((r = reader.read()) != null) {
       Assert.assertTrue("Should read value into a String",
-          r.get("text") instanceof String);
+          r.get("text") instanceof Utf8);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index bea0237..4d37f40 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.avro;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
@@ -149,10 +150,10 @@ public class TestReadWrite {
         .build();
 
     // Write a record with a null value
-    Map<String, Integer> map = new HashMap<String, Integer>();
-    map.put("thirty-four", 34);
-    map.put("eleventy-one", null);
-    map.put("one-hundred", 100);
+    Map<CharSequence, Integer> map = new HashMap<CharSequence, Integer>();
+    map.put(str("thirty-four"), 34);
+    map.put(str("eleventy-one"), null);
+    map.put(str("one-hundred"), 100);
 
     GenericData.Record record = new GenericRecordBuilder(schema)
         .set("mymap", map).build();
@@ -221,7 +222,7 @@ public class TestReadWrite {
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
   }
 
   @Test
@@ -298,14 +299,14 @@ public class TestReadWrite {
     assertEquals(3.1f, nextRecord.get("myfloat"));
     assertEquals(4.1, nextRecord.get("mydouble"));
     assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
-    assertEquals("hello", nextRecord.get("mystring"));
+    assertEquals(str("hello"), nextRecord.get("mystring"));
     assertEquals(expectedEnumSymbol, nextRecord.get("myenum"));
     assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
     assertEquals(integerArray, nextRecord.get("myarray"));
     assertEquals(emptyArray, nextRecord.get("myemptyarray"));
     assertEquals(integerArray, nextRecord.get("myoptionalarray"));
     assertEquals(genericIntegerArrayWithNulls, nextRecord.get("myarrayofoptional"));
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
     assertEquals(emptyMap, nextRecord.get("myemptymap"));
     assertEquals(genericFixed, nextRecord.get("myfixed"));
   }
@@ -517,16 +518,22 @@ public class TestReadWrite {
     assertEquals(3.1f, nextRecord.get("myfloat"));
     assertEquals(4.1, nextRecord.get("mydouble"));
     assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
-    assertEquals("hello", nextRecord.get("mystring"));
-    assertEquals("a", nextRecord.get("myenum"));
+    assertEquals(str("hello"), nextRecord.get("mystring"));
+    assertEquals(str("a"), nextRecord.get("myenum")); // enum symbols are unknown
     assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
     assertEquals(integerArray, nextRecord.get("myarray"));
     assertEquals(integerArray, nextRecord.get("myoptionalarray"));
     assertEquals(ingeterArrayWithNulls, nextRecord.get("myarrayofoptional"));
     assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
     assertEquals(genericFixed, nextRecord.get("myfixed"));
 
   }
 
+  /**
+   * Return a String or Utf8 depending on whether compatibility is on
+   */
+  public CharSequence str(String value) {
+    return compat ? value : new Utf8(value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
deleted file mode 100644
index 5dd58f8..0000000
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldBehavior.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.avro;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Resources;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.MessageTypeParser;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import sun.net.www.content.text.Generic;
-
-import static org.apache.parquet.avro.AvroTestUtil.array;
-import static org.apache.parquet.avro.AvroTestUtil.optional;
-import static org.apache.parquet.avro.AvroTestUtil.optionalField;
-import static org.apache.parquet.avro.AvroTestUtil.primitive;
-import static org.apache.parquet.avro.AvroTestUtil.record;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-public class TestReadWriteOldBehavior {
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    Object[][] data = new Object[][] {
-        { false },  // use the new converters
-        { true } }; // use the old converters
-    return Arrays.asList(data);
-  }
-
-  private final boolean compat;
-  private final Configuration testConf = new Configuration(false);
-
-  public TestReadWriteOldBehavior(boolean compat) {
-    this.compat = compat;
-    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
-  }
-
-  @Test
-  public void testEmptyArray() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("array.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    // Write a record with an empty array.
-    List<Integer> emptyArray = new ArrayList<Integer>();
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("myarray", emptyArray).build();
-    writer.write(record);
-    writer.close();
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-
-    assertNotNull(nextRecord);
-    assertEquals(emptyArray, nextRecord.get("myarray"));
-  }
-
-  @Test
-  public void testEmptyMap() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("map.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    AvroParquetWriter<GenericRecord> writer = 
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    // Write a record with an empty map.
-    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mymap", emptyMap).build();
-    writer.write(record);
-    writer.close();
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-
-    assertNotNull(nextRecord);
-    assertEquals(emptyMap, nextRecord.get("mymap"));
-  }
-
-  @Test
-  public void testMapWithNulls() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("map_with_nulls.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    // Write a record with a null value
-    Map<String, Integer> map = new HashMap<String, Integer>();
-    map.put("thirty-four", 34);
-    map.put("eleventy-one", null);
-    map.put("one-hundred", 100);
-
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mymap", map).build();
-    writer.write(record);
-    writer.close();
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-
-    assertNotNull(nextRecord);
-    assertEquals(map, nextRecord.get("mymap"));
-  }
-
-  @Test(expected=RuntimeException.class)
-  public void testMapRequiredValueWithNull() throws Exception {
-    Schema schema = Schema.createRecord("record1", null, null, false);
-    schema.setFields(Lists.newArrayList(
-        new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    // Write a record with a null value
-    Map<String, Integer> map = new HashMap<String, Integer>();
-    map.put("thirty-four", 34);
-    map.put("eleventy-one", null);
-    map.put("one-hundred", 100);
-
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mymap", map).build();
-    writer.write(record);
-  }
-
-  @Test
-  public void testMapWithUtf8Key() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("map.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    AvroParquetWriter<GenericRecord> writer = 
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    // Write a record with a map with Utf8 keys.
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2))
-        .build();
-    writer.write(record);
-    writer.close();
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-
-    assertNotNull(nextRecord);
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
-  }
-
-  @Test
-  public void testAll() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("all.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-    
-    AvroParquetWriter<GenericRecord> writer = new
-        AvroParquetWriter<GenericRecord>(file, schema);
-
-    GenericData.Record nestedRecord = new GenericRecordBuilder(
-        schema.getField("mynestedrecord").schema())
-            .set("mynestedint", 1).build();
-
-    List<Integer> integerArray = Arrays.asList(1, 2, 3);
-    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
-        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
-
-    GenericFixed genericFixed = new GenericData.Fixed(
-        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
-
-    List<Integer> emptyArray = new ArrayList<Integer>();
-    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
-
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mynull", null)
-        .set("myboolean", true)
-        .set("myint", 1)
-        .set("mylong", 2L)
-        .set("myfloat", 3.1f)
-        .set("mydouble", 4.1)
-        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
-        .set("mystring", "hello")
-        .set("mynestedrecord", nestedRecord)
-        .set("myenum", "a")
-        .set("myarray", genericIntegerArray)
-        .set("myemptyarray", emptyArray)
-        .set("myoptionalarray", genericIntegerArray)
-        .set("myarrayofoptional", genericIntegerArray)
-        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
-        .set("myemptymap", emptyMap)
-        .set("myfixed", genericFixed)
-        .build();
-
-    writer.write(record);
-    writer.close();
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-
-    Object expectedEnumSymbol = compat ? "a" :
-        new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
-
-    assertNotNull(nextRecord);
-    assertEquals(null, nextRecord.get("mynull"));
-    assertEquals(true, nextRecord.get("myboolean"));
-    assertEquals(1, nextRecord.get("myint"));
-    assertEquals(2L, nextRecord.get("mylong"));
-    assertEquals(3.1f, nextRecord.get("myfloat"));
-    assertEquals(4.1, nextRecord.get("mydouble"));
-    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
-    assertEquals("hello", nextRecord.get("mystring"));
-    assertEquals(expectedEnumSymbol, nextRecord.get("myenum"));
-    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
-    assertEquals(integerArray, nextRecord.get("myarray"));
-    assertEquals(emptyArray, nextRecord.get("myemptyarray"));
-    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
-    assertEquals(integerArray, nextRecord.get("myarrayofoptional"));
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
-    assertEquals(emptyMap, nextRecord.get("myemptymap"));
-    assertEquals(genericFixed, nextRecord.get("myfixed"));
-  }
-
-  @Test
-  public void testArrayWithNullValues() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("all.avsc").openStream());
-
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    GenericData.Record nestedRecord = new GenericRecordBuilder(
-        schema.getField("mynestedrecord").schema())
-        .set("mynestedint", 1).build();
-
-    List<Integer> integerArray = Arrays.asList(1, 2, 3);
-    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
-        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
-
-    GenericFixed genericFixed = new GenericData.Fixed(
-        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
-
-    List<Integer> emptyArray = new ArrayList<Integer>();
-    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
-
-    Schema arrayOfOptionalIntegers = Schema.createArray(
-        optional(Schema.create(Schema.Type.INT)));
-    GenericData.Array<Integer> genericIntegerArrayWithNulls =
-        new GenericData.Array<Integer>(
-            arrayOfOptionalIntegers,
-            Arrays.asList(1, null, 2, null, 3));
-
-    GenericData.Record record = new GenericRecordBuilder(schema)
-        .set("mynull", null)
-        .set("myboolean", true)
-        .set("myint", 1)
-        .set("mylong", 2L)
-        .set("myfloat", 3.1f)
-        .set("mydouble", 4.1)
-        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
-        .set("mystring", "hello")
-        .set("mynestedrecord", nestedRecord)
-        .set("myenum", "a")
-        .set("myarray", genericIntegerArray)
-        .set("myemptyarray", emptyArray)
-        .set("myoptionalarray", genericIntegerArray)
-        .set("myarrayofoptional", genericIntegerArrayWithNulls)
-        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
-        .set("myemptymap", emptyMap)
-        .set("myfixed", genericFixed)
-        .build();
-
-    final AvroParquetWriter<GenericRecord> writer =
-        new AvroParquetWriter<GenericRecord>(file, schema);
-
-    try {
-      writer.write(record);
-      fail("Should not succeed writing an array with null values");
-    } catch (Exception e) {
-      // expected
-    } finally {
-      writer.close();
-    }
-  }
-
-  @Test
-  public void testAllUsingDefaultAvroSchema() throws Exception {
-    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
-    tmp.deleteOnExit();
-    tmp.delete();
-    Path file = new Path(tmp.getPath());
-
-    // write file using Parquet APIs
-    ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file,
-        new WriteSupport<Map<String, Object>>() {
-
-      private RecordConsumer recordConsumer;
-
-      @Override
-      public WriteContext init(Configuration configuration) {
-        return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
-            new HashMap<String, String>());
-      }
-
-      @Override
-      public void prepareForWrite(RecordConsumer recordConsumer) {
-        this.recordConsumer = recordConsumer;
-      }
-
-      @Override
-      public void write(Map<String, Object> record) {
-        recordConsumer.startMessage();
-
-        int index = 0;
-
-        recordConsumer.startField("myboolean", index);
-        recordConsumer.addBoolean((Boolean) record.get("myboolean"));
-        recordConsumer.endField("myboolean", index++);
-
-        recordConsumer.startField("myint", index);
-        recordConsumer.addInteger((Integer) record.get("myint"));
-        recordConsumer.endField("myint", index++);
-
-        recordConsumer.startField("mylong", index);
-        recordConsumer.addLong((Long) record.get("mylong"));
-        recordConsumer.endField("mylong", index++);
-
-        recordConsumer.startField("myfloat", index);
-        recordConsumer.addFloat((Float) record.get("myfloat"));
-        recordConsumer.endField("myfloat", index++);
-
-        recordConsumer.startField("mydouble", index);
-        recordConsumer.addDouble((Double) record.get("mydouble"));
-        recordConsumer.endField("mydouble", index++);
-
-        recordConsumer.startField("mybytes", index);
-        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
-        recordConsumer.endField("mybytes", index++);
-
-        recordConsumer.startField("mystring", index);
-        recordConsumer.addBinary(Binary.fromString((String) record.get("mystring")));
-        recordConsumer.endField("mystring", index++);
-
-        recordConsumer.startField("mynestedrecord", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("mynestedint", 0);
-        recordConsumer.addInteger((Integer) record.get("mynestedint"));
-        recordConsumer.endField("mynestedint", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("mynestedrecord", index++);
-
-        recordConsumer.startField("myenum", index);
-        recordConsumer.addBinary(Binary.fromString((String) record.get("myenum")));
-        recordConsumer.endField("myenum", index++);
-
-        recordConsumer.startField("myarray", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("array", 0);
-        for (int val : (int[]) record.get("myarray")) {
-          recordConsumer.addInteger(val);
-        }
-        recordConsumer.endField("array", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("myarray", index++);
-
-        recordConsumer.startField("myoptionalarray", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("array", 0);
-        for (int val : (int[]) record.get("myoptionalarray")) {
-          recordConsumer.addInteger(val);
-        }
-        recordConsumer.endField("array", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("myoptionalarray", index++);
-
-        recordConsumer.startField("myarrayofoptional", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("list", 0);
-        for (Integer val : (Integer[]) record.get("myarrayofoptional")) {
-          recordConsumer.startGroup();
-          if (val != null) {
-            recordConsumer.startField("element", 0);
-            recordConsumer.addInteger(val);
-            recordConsumer.endField("element", 0);
-          }
-          recordConsumer.endGroup();
-        }
-        recordConsumer.endField("list", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("myarrayofoptional", index++);
-
-        recordConsumer.startField("myrecordarray", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("array", 0);
-        recordConsumer.startGroup();
-        recordConsumer.startField("a", 0);
-        for (int val : (int[]) record.get("myrecordarraya")) {
-          recordConsumer.addInteger(val);
-        }
-        recordConsumer.endField("a", 0);
-        recordConsumer.startField("b", 1);
-        for (int val : (int[]) record.get("myrecordarrayb")) {
-          recordConsumer.addInteger(val);
-        }
-        recordConsumer.endField("b", 1);
-        recordConsumer.endGroup();
-        recordConsumer.endField("array", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("myrecordarray", index++);
-
-        recordConsumer.startField("mymap", index);
-        recordConsumer.startGroup();
-        recordConsumer.startField("map", 0);
-        recordConsumer.startGroup();
-        Map<String, Integer> mymap = (Map<String, Integer>) record.get("mymap");
-        recordConsumer.startField("key", 0);
-        for (String key : mymap.keySet()) {
-          recordConsumer.addBinary(Binary.fromString(key));
-        }
-        recordConsumer.endField("key", 0);
-        recordConsumer.startField("value", 1);
-        for (int val : mymap.values()) {
-          recordConsumer.addInteger(val);
-        }
-        recordConsumer.endField("value", 1);
-        recordConsumer.endGroup();
-        recordConsumer.endField("map", 0);
-        recordConsumer.endGroup();
-        recordConsumer.endField("mymap", index++);
-
-        recordConsumer.startField("myfixed", index);
-        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
-        recordConsumer.endField("myfixed", index++);
-
-        recordConsumer.endMessage();
-      }
-    });
-    Map<String, Object> record = new HashMap<String, Object>();
-    record.put("myboolean", true);
-    record.put("myint", 1);
-    record.put("mylong", 2L);
-    record.put("myfloat", 3.1f);
-    record.put("mydouble", 4.1);
-    record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)));
-    record.put("mystring", "hello");
-    record.put("myenum", "a");
-    record.put("mynestedint", 1);
-    record.put("myarray", new int[] {1, 2, 3});
-    record.put("myoptionalarray", new int[]{1, 2, 3});
-    record.put("myarrayofoptional", new Integer[] {1, null, 2, null, 3});
-    record.put("myrecordarraya", new int[] {1, 2, 3});
-    record.put("myrecordarrayb", new int[] {4, 5, 6});
-    record.put("mymap", ImmutableMap.of("a", 1, "b", 2));
-    record.put("myfixed", new byte[] { (byte) 65 });
-    parquetWriter.write(record);
-    parquetWriter.close();
-
-    Schema nestedRecordSchema = Schema.createRecord("mynestedrecord", null, null, false);
-    nestedRecordSchema.setFields(Arrays.asList(
-        new Schema.Field("mynestedint", Schema.create(Schema.Type.INT), null, null)
-    ));
-    GenericData.Record nestedRecord = new GenericRecordBuilder(nestedRecordSchema)
-        .set("mynestedint", 1).build();
-
-    List<Integer> integerArray = Arrays.asList(1, 2, 3);
-
-    Schema recordArraySchema = Schema.createRecord("array", null, null, false);
-    recordArraySchema.setFields(Arrays.asList(
-        new Schema.Field("a", Schema.create(Schema.Type.INT), null, null),
-        new Schema.Field("b", Schema.create(Schema.Type.INT), null, null)
-    ));
-    GenericRecordBuilder builder = new GenericRecordBuilder(recordArraySchema);
-    List<GenericData.Record> recordArray = new ArrayList<GenericData.Record>();
-    recordArray.add(builder.set("a", 1).set("b", 4).build());
-    recordArray.add(builder.set("a", 2).set("b", 5).build());
-    recordArray.add(builder.set("a", 3).set("b", 6).build());
-    GenericData.Array<GenericData.Record> genericRecordArray = new GenericData.Array<GenericData.Record>(
-        Schema.createArray(recordArraySchema), recordArray);
-
-    GenericFixed genericFixed = new GenericData.Fixed(
-        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
-
-    // 3-level lists are deserialized with the extra layer present
-    Schema elementSchema = record("list",
-        optionalField("element", primitive(Schema.Type.INT)));
-    GenericRecordBuilder elementBuilder = new GenericRecordBuilder(elementSchema);
-    GenericData.Array<GenericData.Record> genericRecordArrayWithNullIntegers =
-        new GenericData.Array<GenericData.Record>(array(elementSchema),
-            Arrays.asList(
-                elementBuilder.set("element", 1).build(),
-                elementBuilder.set("element", null).build(),
-                elementBuilder.set("element", 2).build(),
-                elementBuilder.set("element", null).build(),
-                elementBuilder.set("element", 3).build()
-            ));
-
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
-    GenericRecord nextRecord = reader.read();
-    assertNotNull(nextRecord);
-    assertEquals(true, nextRecord.get("myboolean"));
-    assertEquals(1, nextRecord.get("myint"));
-    assertEquals(2L, nextRecord.get("mylong"));
-    assertEquals(3.1f, nextRecord.get("myfloat"));
-    assertEquals(4.1, nextRecord.get("mydouble"));
-    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
-    assertEquals("hello", nextRecord.get("mystring"));
-    assertEquals("a", nextRecord.get("myenum"));
-    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
-    assertEquals(integerArray, nextRecord.get("myarray"));
-    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
-    assertEquals(genericRecordArrayWithNullIntegers, nextRecord.get("myarrayofoptional"));
-    assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
-    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
-    assertEquals(genericFixed, nextRecord.get("myfixed"));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
new file mode 100644
index 0000000..34a160a
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -0,0 +1,594 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import sun.net.www.content.text.Generic;
+
+import static org.apache.parquet.avro.AvroTestUtil.array;
+import static org.apache.parquet.avro.AvroTestUtil.optional;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.primitive;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class TestReadWriteOldListBehavior {
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { false },  // use the new converters
+        { true } }; // use the old converters
+    return Arrays.asList(data);
+  }
+
+  private final boolean compat;
+  private final Configuration testConf = new Configuration(false);
+
+  public TestReadWriteOldListBehavior(boolean compat) {
+    this.compat = compat;
+    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+  }
+
+  @Test
+  public void testEmptyArray() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("array.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty array.
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("myarray", emptyArray).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyArray, nextRecord.get("myarray"));
+  }
+
+  @Test
+  public void testEmptyMap() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty map.
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", emptyMap).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyMap, nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testMapWithNulls() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map_with_nulls.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<CharSequence, Integer> map = new HashMap<CharSequence, Integer>();
+    map.put(str("thirty-four"), 34);
+    map.put(str("eleventy-one"), null);
+    map.put(str("one-hundred"), 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(map, nextRecord.get("mymap"));
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testMapRequiredValueWithNull() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+  }
+
+  @Test
+  public void testMapWithUtf8Key() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a map with Utf8 keys.
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2))
+        .build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testAll() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+    
+    AvroParquetWriter<GenericRecord> writer = new
+        AvroParquetWriter<GenericRecord>(file, schema);
+
+    GenericData.Record nestedRecord = new GenericRecordBuilder(
+        schema.getField("mynestedrecord").schema())
+            .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
+        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mynull", null)
+        .set("myboolean", true)
+        .set("myint", 1)
+        .set("mylong", 2L)
+        .set("myfloat", 3.1f)
+        .set("mydouble", 4.1)
+        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+        .set("mystring", "hello")
+        .set("mynestedrecord", nestedRecord)
+        .set("myenum", "a")
+        .set("myarray", genericIntegerArray)
+        .set("myemptyarray", emptyArray)
+        .set("myoptionalarray", genericIntegerArray)
+        .set("myarrayofoptional", genericIntegerArray)
+        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+        .set("myemptymap", emptyMap)
+        .set("myfixed", genericFixed)
+        .build();
+
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    Object expectedEnumSymbol = compat ? "a" :
+        new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
+
+    assertNotNull(nextRecord);
+    assertEquals(null, nextRecord.get("mynull"));
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals(str("hello"), nextRecord.get("mystring"));
+    assertEquals(expectedEnumSymbol, nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(emptyArray, nextRecord.get("myemptyarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(integerArray, nextRecord.get("myarrayofoptional"));
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
+    assertEquals(emptyMap, nextRecord.get("myemptymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+  }
+
+  @Test
+  public void testArrayWithNullValues() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    GenericData.Record nestedRecord = new GenericRecordBuilder(
+        schema.getField("mynestedrecord").schema())
+        .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
+        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+
+    Schema arrayOfOptionalIntegers = Schema.createArray(
+        optional(Schema.create(Schema.Type.INT)));
+    GenericData.Array<Integer> genericIntegerArrayWithNulls =
+        new GenericData.Array<Integer>(
+            arrayOfOptionalIntegers,
+            Arrays.asList(1, null, 2, null, 3));
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mynull", null)
+        .set("myboolean", true)
+        .set("myint", 1)
+        .set("mylong", 2L)
+        .set("myfloat", 3.1f)
+        .set("mydouble", 4.1)
+        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+        .set("mystring", "hello")
+        .set("mynestedrecord", nestedRecord)
+        .set("myenum", "a")
+        .set("myarray", genericIntegerArray)
+        .set("myemptyarray", emptyArray)
+        .set("myoptionalarray", genericIntegerArray)
+        .set("myarrayofoptional", genericIntegerArrayWithNulls)
+        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+        .set("myemptymap", emptyMap)
+        .set("myfixed", genericFixed)
+        .build();
+
+    final AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    try {
+      writer.write(record);
+      fail("Should not succeed writing an array with null values");
+    } catch (Exception e) {
+      // expected
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  public void testAllUsingDefaultAvroSchema() throws Exception {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    // write file using Parquet APIs
+    ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file,
+        new WriteSupport<Map<String, Object>>() {
+
+      private RecordConsumer recordConsumer;
+
+      @Override
+      public WriteContext init(Configuration configuration) {
+        return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
+            new HashMap<String, String>());
+      }
+
+      @Override
+      public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+      }
+
+      @Override
+      public void write(Map<String, Object> record) {
+        recordConsumer.startMessage();
+
+        int index = 0;
+
+        recordConsumer.startField("myboolean", index);
+        recordConsumer.addBoolean((Boolean) record.get("myboolean"));
+        recordConsumer.endField("myboolean", index++);
+
+        recordConsumer.startField("myint", index);
+        recordConsumer.addInteger((Integer) record.get("myint"));
+        recordConsumer.endField("myint", index++);
+
+        recordConsumer.startField("mylong", index);
+        recordConsumer.addLong((Long) record.get("mylong"));
+        recordConsumer.endField("mylong", index++);
+
+        recordConsumer.startField("myfloat", index);
+        recordConsumer.addFloat((Float) record.get("myfloat"));
+        recordConsumer.endField("myfloat", index++);
+
+        recordConsumer.startField("mydouble", index);
+        recordConsumer.addDouble((Double) record.get("mydouble"));
+        recordConsumer.endField("mydouble", index++);
+
+        recordConsumer.startField("mybytes", index);
+        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+        recordConsumer.endField("mybytes", index++);
+
+        recordConsumer.startField("mystring", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("mystring")));
+        recordConsumer.endField("mystring", index++);
+
+        recordConsumer.startField("mynestedrecord", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("mynestedint", 0);
+        recordConsumer.addInteger((Integer) record.get("mynestedint"));
+        recordConsumer.endField("mynestedint", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mynestedrecord", index++);
+
+        recordConsumer.startField("myenum", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("myenum")));
+        recordConsumer.endField("myenum", index++);
+
+        recordConsumer.startField("myarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarray", index++);
+
+        recordConsumer.startField("myoptionalarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myoptionalarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myoptionalarray", index++);
+
+        recordConsumer.startField("myarrayofoptional", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("list", 0);
+        for (Integer val : (Integer[]) record.get("myarrayofoptional")) {
+          recordConsumer.startGroup();
+          if (val != null) {
+            recordConsumer.startField("element", 0);
+            recordConsumer.addInteger(val);
+            recordConsumer.endField("element", 0);
+          }
+          recordConsumer.endGroup();
+        }
+        recordConsumer.endField("list", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarrayofoptional", index++);
+
+        recordConsumer.startField("myrecordarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        recordConsumer.startGroup();
+        recordConsumer.startField("a", 0);
+        for (int val : (int[]) record.get("myrecordarraya")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("a", 0);
+        recordConsumer.startField("b", 1);
+        for (int val : (int[]) record.get("myrecordarrayb")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("b", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myrecordarray", index++);
+
+        recordConsumer.startField("mymap", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("map", 0);
+        recordConsumer.startGroup();
+        Map<String, Integer> mymap = (Map<String, Integer>) record.get("mymap");
+        recordConsumer.startField("key", 0);
+        for (String key : mymap.keySet()) {
+          recordConsumer.addBinary(Binary.fromString(key));
+        }
+        recordConsumer.endField("key", 0);
+        recordConsumer.startField("value", 1);
+        for (int val : mymap.values()) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("value", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("map", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mymap", index++);
+
+        recordConsumer.startField("myfixed", index);
+        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+        recordConsumer.endField("myfixed", index++);
+
+        recordConsumer.endMessage();
+      }
+    });
+    Map<String, Object> record = new HashMap<String, Object>();
+    record.put("myboolean", true);
+    record.put("myint", 1);
+    record.put("mylong", 2L);
+    record.put("myfloat", 3.1f);
+    record.put("mydouble", 4.1);
+    record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)));
+    record.put("mystring", "hello");
+    record.put("myenum", "a");
+    record.put("mynestedint", 1);
+    record.put("myarray", new int[] {1, 2, 3});
+    record.put("myoptionalarray", new int[]{1, 2, 3});
+    record.put("myarrayofoptional", new Integer[] {1, null, 2, null, 3});
+    record.put("myrecordarraya", new int[] {1, 2, 3});
+    record.put("myrecordarrayb", new int[] {4, 5, 6});
+    record.put("mymap", ImmutableMap.of("a", 1, "b", 2));
+    record.put("myfixed", new byte[] { (byte) 65 });
+    parquetWriter.write(record);
+    parquetWriter.close();
+
+    Schema nestedRecordSchema = Schema.createRecord("mynestedrecord", null, null, false);
+    nestedRecordSchema.setFields(Arrays.asList(
+        new Schema.Field("mynestedint", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericData.Record nestedRecord = new GenericRecordBuilder(nestedRecordSchema)
+        .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+
+    Schema recordArraySchema = Schema.createRecord("array", null, null, false);
+    recordArraySchema.setFields(Arrays.asList(
+        new Schema.Field("a", Schema.create(Schema.Type.INT), null, null),
+        new Schema.Field("b", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericRecordBuilder builder = new GenericRecordBuilder(recordArraySchema);
+    List<GenericData.Record> recordArray = new ArrayList<GenericData.Record>();
+    recordArray.add(builder.set("a", 1).set("b", 4).build());
+    recordArray.add(builder.set("a", 2).set("b", 5).build());
+    recordArray.add(builder.set("a", 3).set("b", 6).build());
+    GenericData.Array<GenericData.Record> genericRecordArray = new GenericData.Array<GenericData.Record>(
+        Schema.createArray(recordArraySchema), recordArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    // 3-level lists are deserialized with the extra layer present
+    Schema elementSchema = record("list",
+        optionalField("element", primitive(Schema.Type.INT)));
+    GenericRecordBuilder elementBuilder = new GenericRecordBuilder(elementSchema);
+    GenericData.Array<GenericData.Record> genericRecordArrayWithNullIntegers =
+        new GenericData.Array<GenericData.Record>(array(elementSchema),
+            Arrays.asList(
+                elementBuilder.set("element", 1).build(),
+                elementBuilder.set("element", null).build(),
+                elementBuilder.set("element", 2).build(),
+                elementBuilder.set("element", null).build(),
+                elementBuilder.set("element", 3).build()
+            ));
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file);
+    GenericRecord nextRecord = reader.read();
+    assertNotNull(nextRecord);
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals(str("hello"), nextRecord.get("mystring"));
+    assertEquals(str("a"), nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(genericRecordArrayWithNullIntegers, nextRecord.get("myarrayofoptional"));
+    assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
+    assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+
+  }
+
+  /**
+   * Return a String or Utf8 depending on whether compatibility is on
+   */
+  public CharSequence str(String value) {
+    return compat ? value : new Utf8(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/918609f2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
index dffaf57..c4bf5bd 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
@@ -30,6 +30,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
@@ -65,14 +66,14 @@ public class TestReflectReadWrite {
 
     Path path = writePojosToParquetFile(2, CompressionCodecName.UNCOMPRESSED, false);
     ParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(conf, path);
-    GenericRecord object = getGenericPojo();
+    GenericRecord object = getGenericPojoUtf8();
     for (int i = 0; i < 2; i += 1) {
       assertEquals(object, reader.read());
     }
     assertNull(reader.read());
   }
 
-  private GenericRecord getGenericPojo() {
+  private GenericRecord getGenericPojoUtf8() {
     Schema schema = ReflectData.get().getSchema(Pojo.class);
     GenericData.Record record = new GenericData.Record(schema);
     record.put("myboolean", true);
@@ -83,21 +84,21 @@ public class TestReflectReadWrite {
     record.put("myfloat", 3.1f);
     record.put("mydouble", 4.1);
     record.put("mybytes", ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }));
-    record.put("mystring", "Hello");
+    record.put("mystring", new Utf8("Hello"));
     record.put("myenum", new GenericData.EnumSymbol(
         schema.getField("myenum").schema(), "A"));
-    Map<String, String> map = new HashMap<String, String>();
-    map.put("a", "1");
-    map.put("b", "2");
+    Map<CharSequence, CharSequence> map = new HashMap<CharSequence, CharSequence>();
+    map.put(new Utf8("a"), new Utf8("1"));
+    map.put(new Utf8("b"), new Utf8("2"));
     record.put("mymap", map);
     record.put("myshortarray", new GenericData.Array<Integer>(
         schema.getField("myshortarray").schema(), Lists.newArrayList(1, 2)));
     record.put("myintarray", new GenericData.Array<Integer>(
         schema.getField("myintarray").schema(), Lists.newArrayList(1, 2)));
-    record.put("mystringarray", new GenericData.Array<String>(
-        schema.getField("mystringarray").schema(), Lists.newArrayList("a", "b")));
-    record.put("mylist", new GenericData.Array<String>(
-        schema.getField("mylist").schema(), Lists.newArrayList("a", "b", "c")));
+    record.put("mystringarray", new GenericData.Array<Utf8>(
+        schema.getField("mystringarray").schema(), Lists.newArrayList(new Utf8("a"), new Utf8("b"))));
+    record.put("mylist", new GenericData.Array<Utf8>(
+        schema.getField("mylist").schema(), Lists.newArrayList(new Utf8("a"), new Utf8("b"), new Utf8("c"))));
     return record;
   }
 


Mime
View raw message