parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [48/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:45 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
new file mode 100644
index 0000000..c4585a7
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -0,0 +1,999 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.apache.parquet.avro.AvroTestUtil.array;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.instance;
+import static org.apache.parquet.avro.AvroTestUtil.optional;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.primitive;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+
+public class TestArrayCompatibility {
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
+
+  @BeforeClass
+  public static void setupNewBehaviorConfiguration() {
+    NEW_BEHAVIOR_CONF.setBoolean(
+        AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
+  }
+
+  @Test
+  @Ignore(value="Not yet supported")
+  public void testUnannotatedListOfPrimitives() throws Exception {
+    Path test = writeDirect(
+        "message UnannotatedListOfPrimitives {" +
+            "  repeated int32 list_of_ints;" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_ints", 0);
+
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("list_of_ints", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema expectedSchema = record("OldPrimitiveInList",
+        field("list_of_ints", array(primitive(Schema.Type.INT))));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_ints", Arrays.asList(34, 35, 36));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  @Ignore(value="Not yet supported")
+  public void testUnannotatedListOfGroups() throws Exception {
+    Path test = writeDirect(
+        "message UnannotatedListOfGroups {" +
+            "  repeated group list_of_points {" +
+            "    required float x;" +
+            "    required float y;" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_points", 0);
+
+            rc.startGroup();
+            rc.startField("x", 0);
+            rc.addFloat(1.0f);
+            rc.endField("x", 0);
+            rc.startField("y", 1);
+            rc.addFloat(1.0f);
+            rc.endField("y", 1);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("x", 0);
+            rc.addFloat(2.0f);
+            rc.endField("x", 0);
+            rc.startField("y", 1);
+            rc.addFloat(2.0f);
+            rc.endField("y", 1);
+            rc.endGroup();
+
+            rc.endField("list_of_points", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema point = record("?",
+        field("x", primitive(Schema.Type.FLOAT)),
+        field("y", primitive(Schema.Type.FLOAT)));
+    Schema expectedSchema = record("OldPrimitiveInList",
+        field("list_of_points", array(point)));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_points", Arrays.asList(
+            instance(point, "x", 1.0f, "y", 1.0f),
+            instance(point, "x", 2.0f, "y", 2.0f)));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testRepeatedPrimitiveInList() throws Exception {
+    Path test = writeDirect(
+        "message RepeatedPrimitiveInList {" +
+            "  required group list_of_ints (LIST) {" +
+            "    repeated int32 array;" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_ints", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0);
+
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("array", 0);
+            rc.endGroup();
+
+            rc.endField("list_of_ints", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema expectedSchema = record("RepeatedPrimitiveInList",
+        field("list_of_ints", array(Schema.create(Schema.Type.INT))));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_ints", Arrays.asList(34, 35, 36));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testMultiFieldGroupInList() throws Exception {
+    // tests the missing element layer, detected by a multi-field group
+    Path test = writeDirect(
+        "message MultiFieldGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group element {" +
+            "      required double latitude;" +
+            "      required double longitude;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+    Schema expectedSchema = record("MultiFieldGroupInList",
+        optionalField("locations", array(location)));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 0.0),
+            instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testSingleFieldGroupInList() throws Exception {
+    // this tests the case where non-avro older data has an ambiguous list
+    Path test = writeDirect(
+        "message SingleFieldGroupInList {" +
+            "  optional group single_element_groups (LIST) {" +
+            "    repeated group single_element_group {" +
+            "      required int64 count;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("single_element_groups", 0);
+
+            rc.startGroup();
+            rc.startField("single_element_group", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(1234L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(2345L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.endField("single_element_group", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("single_element_groups", 0);
+            rc.endMessage();
+          }
+        });
+
+    // can't tell from storage whether this should be a list of single-field
+    // records or if the single_field_group layer is synthetic.
+
+    // old behavior - assume that the repeated type is the element type
+    Schema singleElementGroupSchema = record("single_element_group",
+        field("count", primitive(Schema.Type.LONG)));
+    Schema oldSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups", array(singleElementGroupSchema)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "single_element_groups", Arrays.asList(
+            instance(singleElementGroupSchema, "count", 1234L),
+            instance(singleElementGroupSchema, "count", 2345L)));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
+    GenericRecord newRecord = instance(newSchema,
+        "single_element_groups", Arrays.asList(1234L, 2345L));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testSingleFieldGroupInListWithSchema() throws Exception {
+    // this tests the case where older data has an ambiguous structure, but the
+    // correct interpretation can be determined from the avro schema
+
+    Schema singleElementRecord = record("single_element_group",
+        field("count", primitive(Schema.Type.LONG)));
+
+    Schema expectedSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups",
+            array(singleElementRecord)));
+
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());
+
+    Path test = writeDirect(
+        "message SingleFieldGroupInList {" +
+            "  optional group single_element_groups (LIST) {" +
+            "    repeated group single_element_group {" +
+            "      required int64 count;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("single_element_groups", 0);
+
+            rc.startGroup();
+            rc.startField("single_element_group", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(1234L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(2345L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.endField("single_element_group", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("single_element_groups", 0);
+            rc.endMessage();
+          }
+        },
+        metadata);
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "single_element_groups", Arrays.asList(
+            instance(singleElementRecord, "count", 1234L),
+            instance(singleElementRecord, "count", 2345L)));
+
+    // both should behave the same way because the schema is present
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testNewOptionalGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message NewOptionalGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group list {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a null element (element field is omitted)
+            rc.startGroup(); // array level
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list", optionalField("element", location));
+    Schema oldSchema = record("NewOptionalGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0)),
+            instance(elementRecord),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0))));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("NewOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 0.0),
+            null,
+            instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testNewRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message NewRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group list {" +
+            "      required group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list", field("element", location));
+    Schema oldSchema = record("NewRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("NewRequiredGroupInList",
+        optionalField("locations", array(location)));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testAvroCompatRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message AvroCompatRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group array {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("array", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("array", optionalField("element", location));
+    Schema oldSchema = record("AvroCompatRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
+  public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
+    Path test = writeDirect(
+        "message AvroCompatRequiredGroupInListWithSchema {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group array {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("array", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    Schema newSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    Configuration oldConfWithSchema = new Configuration();
+    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
+
+    // both should use the schema structure that is provided
+    assertReaderContains(
+        new AvroParquetReader<GenericRecord>(oldConfWithSchema, test),
+        newSchema, newRecord);
+
+    Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);
+
+    assertReaderContains(
+        new AvroParquetReader<GenericRecord>(newConfWithSchema, test),
+        newSchema, newRecord);
+  }
+
+  @Test
+  public void testThriftCompatRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message ThriftCompatRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group locations_tuple {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("locations_tuple", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("locations_tuple", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("locations_tuple", optionalField("element", location));
+    Schema oldSchema = record("ThriftCompatRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
+  public void testHiveCompatOptionalGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message HiveCompatOptionalGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group bag {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("bag", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("bag", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("bag", optionalField("element", location));
+    Schema oldSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    Schema newSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  private interface DirectWriter {
+    public void write(RecordConsumer consumer);
+  }
+
+  private static class DirectWriteSupport extends WriteSupport<Void> {
+    private RecordConsumer recordConsumer;
+    private final MessageType type;
+    private final DirectWriter writer;
+    private final Map<String, String> metadata;
+
+    private DirectWriteSupport(MessageType type, DirectWriter writer,
+                               Map<String, String> metadata) {
+      this.type = type;
+      this.writer = writer;
+      this.metadata = metadata;
+    }
+
+    @Override
+    public WriteContext init(Configuration configuration) {
+      return new WriteContext(type, metadata);
+    }
+
+    @Override
+    public void prepareForWrite(RecordConsumer recordConsumer) {
+      this.recordConsumer = recordConsumer;
+    }
+
+    @Override
+    public void write(Void record) {
+      writer.write(recordConsumer);
+    }
+  }
+
+  private Path writeDirect(String type, DirectWriter writer) throws IOException {
+    return writeDirect(MessageTypeParser.parseMessageType(type), writer);
+  }
+
+  private Path writeDirect(String type, DirectWriter writer,
+                           Map<String, String> metadata) throws IOException {
+    return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
+  }
+
+  private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
+    return writeDirect(type, writer, new HashMap<String, String>());
+  }
+
+  private Path writeDirect(MessageType type, DirectWriter writer,
+                           Map<String, String> metadata) throws IOException {
+    File temp = tempDir.newFile(UUID.randomUUID().toString());
+    temp.deleteOnExit();
+    temp.delete();
+
+    Path path = new Path(temp.getPath());
+
+    ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
+        path, new DirectWriteSupport(type, writer, metadata));
+    parquetWriter.write(null);
+    parquetWriter.close();
+
+    return path;
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
+      Path path) throws IOException {
+    return new AvroParquetReader<T>(path);
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
+      Path path) throws IOException {
+    return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
+  }
+
+  public <T extends IndexedRecord> void assertReaderContains(
+      AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
+      throws IOException {
+    for (T expectedRecord : expectedRecords) {
+      T actualRecord = reader.read();
+      Assert.assertEquals("Should match expected schema",
+          expectedSchema, actualRecord.getSchema());
+      Assert.assertEquals("Should match the expected record",
+          expectedRecord, actualRecord);
+    }
+    Assert.assertNull("Should only contain " + expectedRecords.length +
+            " record" + (expectedRecords.length == 1 ? "" : "s"),
+        reader.read());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroDataSupplier.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroDataSupplier.java
new file mode 100644
index 0000000..d2a40d4
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroDataSupplier.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAvroDataSupplier {
+
+  public static class GenericDataSupplier implements AvroDataSupplier {
+    @Override
+    public GenericData get() {
+      return GenericData.get();
+    }
+  }
+
+  @Test
+  public void testSetSupplierMethod() {
+    Configuration conf = new Configuration(false);
+    AvroReadSupport.setAvroDataSupplier(conf, GenericDataSupplier.class);
+    Assert.assertEquals("Should contain the class name",
+        "org.apache.parquet.avro.TestAvroDataSupplier$GenericDataSupplier",
+        conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
new file mode 100644
index 0000000..f8de8c2
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -0,0 +1,256 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.codehaus.jackson.node.NullNode;
+import org.junit.Test;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAvroSchemaConverter {
+
+    public static final String ALL_PARQUET_SCHEMA =
+      "message org.apache.parquet.avro.myrecord {\n" +
+      "  required boolean myboolean;\n" +
+      "  required int32 myint;\n" +
+      "  required int64 mylong;\n" +
+      "  required float myfloat;\n" +
+      "  required double mydouble;\n" +
+      "  required binary mybytes;\n" +
+      "  required binary mystring (UTF8);\n" +
+      "  required group mynestedrecord {\n" +
+      "    required int32 mynestedint;\n" +
+      "  }\n" +
+      "  required binary myenum (ENUM);\n" +
+      "  required group myarray (LIST) {\n" +
+      "    repeated int32 array;\n" +
+      "  }\n" +
+      "  optional group myoptionalarray (LIST) {\n" +
+      "    repeated int32 array;\n" +
+      "  }\n" +
+      "  required group myrecordarray (LIST) {\n" +
+      "    repeated group array {\n" +
+      "      required int32 a;\n" +
+      "      required int32 b;\n" +
+      "    }\n" +
+      "  }\n" +
+      "  required group mymap (MAP) {\n" +
+      "    repeated group map (MAP_KEY_VALUE) {\n" +
+      "      required binary key (UTF8);\n" +
+      "      required int32 value;\n" +
+      "    }\n" +
+      "  }\n" +
+      "  required fixed_len_byte_array(1) myfixed;\n" +
+      "}\n";
+
+  private void testAvroToParquetConversion(Schema avroSchema, String schemaString) throws
+      Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    MessageType schema = avroSchemaConverter.convert(avroSchema);
+    MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
+    assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
+        schema.toString());
+  }
+
+  private void testParquetToAvroConversion(Schema avroSchema, String schemaString) throws
+      Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType
+        (schemaString));
+    assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(),
+        schema.toString());
+  }
+
+  private void testRoundTripConversion(Schema avroSchema, String schemaString) throws
+      Exception {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    MessageType schema = avroSchemaConverter.convert(avroSchema);
+    MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
+    assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
+        schema.toString());
+    Schema convertedAvroSchema = avroSchemaConverter.convert(expectedMT);
+    assertEquals("converting " + expectedMT + " to " + avroSchema.toString(true),
+        avroSchema.toString(), convertedAvroSchema.toString());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testTopLevelMustBeARecord() {
+    new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
+  }
+
+  @Test
+  public void testAllTypes() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+    testAvroToParquetConversion(
+        schema,
+        "message org.apache.parquet.avro.myrecord {\n" +
+            // Avro nulls are not encoded, unless they are null unions
+            "  required boolean myboolean;\n" +
+            "  required int32 myint;\n" +
+            "  required int64 mylong;\n" +
+            "  required float myfloat;\n" +
+            "  required double mydouble;\n" +
+            "  required binary mybytes;\n" +
+            "  required binary mystring (UTF8);\n" +
+            "  required group mynestedrecord {\n" +
+            "    required int32 mynestedint;\n" +
+            "  }\n" +
+            "  required binary myenum (ENUM);\n" +
+            "  required group myarray (LIST) {\n" +
+            "    repeated int32 array;\n" +
+            "  }\n" +
+            "  required group myemptyarray (LIST) {\n" +
+            "    repeated int32 array;\n" +
+            "  }\n" +
+            "  optional group myoptionalarray (LIST) {\n" +
+            "    repeated int32 array;\n" +
+            "  }\n" +
+            "  required group mymap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required binary key (UTF8);\n" +
+            "      required int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required group myemptymap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required binary key (UTF8);\n" +
+            "      required int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "  required fixed_len_byte_array(1) myfixed;\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testAllTypesParquetToAvro() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("allFromParquet.avsc").openStream());
+    // Cannot use round-trip assertion because enum is lost
+    testParquetToAvroConversion(schema, ALL_PARQUET_SCHEMA);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testParquetMapWithNonStringKeyFails() throws Exception {
+    MessageType parquetSchema = MessageTypeParser.parseMessageType(
+      "message myrecord {\n" +
+        "  required group mymap (MAP) {\n" +
+        "    repeated group map (MAP_KEY_VALUE) {\n" +
+        "      required int32 key;\n" +
+        "      required int32 value;\n" +
+        "    }\n" +
+        "  }\n" +
+        "}\n"
+    );
+    new AvroSchemaConverter().convert(parquetSchema);
+  }
+
+  @Test
+  public void testOptionalFields() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    Schema optionalInt = optional(Schema.create(Schema.Type.INT));
+    schema.setFields(Arrays.asList(
+        new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
+    ));
+    testRoundTripConversion(
+        schema,
+        "message record1 {\n" +
+            "  optional int32 myint;\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testOptionalMapValue() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
+    schema.setFields(Arrays.asList(
+        new Schema.Field("myintmap", optionalIntMap, null, null)
+    ));
+    testRoundTripConversion(
+        schema,
+        "message record1 {\n" +
+            "  required group myintmap (MAP) {\n" +
+            "    repeated group map (MAP_KEY_VALUE) {\n" +
+            "      required binary key (UTF8);\n" +
+            "      optional int32 value;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testUnionOfTwoTypes() throws Exception {
+    Schema schema = Schema.createRecord("record2", null, null, false);
+    Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
+        .NULL),
+        Schema.create(Schema.Type.INT),
+        Schema.create(Schema.Type.FLOAT)));
+    schema.setFields(Arrays.asList(
+        new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
+
+    // Avro union is modelled using optional data members of the different
+    // types. This does not translate back into an Avro union
+    testAvroToParquetConversion(
+        schema,
+        "message record2 {\n" +
+            "  optional group myunion {\n" +
+            "    optional int32 member0;\n" +
+            "    optional float member1;\n" +
+            "  }\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testArrayOfOptionalRecords() throws Exception {
+    Schema innerRecord = Schema.createRecord("InnerRecord", null, null, false);
+    Schema optionalString = optional(Schema.create(Schema.Type.STRING));
+    innerRecord.setFields(Lists.newArrayList(
+        new Schema.Field("s1", optionalString, null, NullNode.getInstance()),
+        new Schema.Field("s2", optionalString, null, NullNode.getInstance())
+    ));
+    Schema schema = Schema.createRecord("HasArray", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
+            null, NullNode.getInstance())
+    ));
+    System.err.println("Avro schema: " + schema.toString(true));
+
+    // Cannot use round-trip assertion because InnerRecord optional is removed
+    testAvroToParquetConversion(schema, "message HasArray {\n" +
+        "  required group myarray (LIST) {\n" +
+        "    repeated group array {\n" +
+        "      optional binary s1 (UTF8);\n" +
+        "      optional binary s2 (UTF8);\n" +
+        "    }\n" +
+        "  }\n" +
+        "}\n");
+  }
+
+  public static Schema optional(Schema original) {
+    return Schema.createUnion(Lists.newArrayList(
+        Schema.create(Schema.Type.NULL),
+        original));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
new file mode 100644
index 0000000..aae11a7
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.io.Resources;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class TestBackwardCompatibility {
+
+  @Test
+  public void testStringCompatibility() throws IOException {
+    // some older versions of Parquet used avro.schema instead of
+    // parquet.avro.schema and didn't annotate binary with UTF8 when the type
+    // was converted from an Avro string. this validates that the old read
+    // schema is recognized and used to read the file as expected.
+    Path testFile = new Path(Resources.getResource("strings-2.parquet").getFile());
+    Configuration conf = new Configuration();
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .builder(new AvroReadSupport<GenericRecord>(), testFile)
+        .withConf(conf)
+        .build();
+    GenericRecord r;
+    while ((r = reader.read()) != null) {
+      Assert.assertTrue("Should read value into a String",
+          r.get("text") instanceof String);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java
new file mode 100644
index 0000000..36c090f
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java
@@ -0,0 +1,144 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Test;
+import org.apache.parquet.Log;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestInputOutputFormat {
+  private static final Log LOG = Log.getLog(TestInputOutputFormat.class);
+
+  private static Schema avroSchema;
+  static {
+    avroSchema = Schema.createRecord("record1", null, null, false);
+    avroSchema.setFields(
+        Arrays.asList(new Schema.Field("a",
+            Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL))),
+            null, null)));
+  }
+
+  public static GenericRecord nextRecord(Integer i) {
+    return new GenericRecordBuilder(avroSchema).set("a", i).build();
+  };
+
+  public static class MyMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
+
+    public void run(Context context) throws IOException ,InterruptedException {
+      for (int i = 0; i < 10; i++) {
+        GenericRecord a;
+        a = TestInputOutputFormat.nextRecord(i == 4 ? null : i);
+        context.write(null, a);
+      }
+    }
+  }
+
+  public static class MyMapper2 extends Mapper<Void, GenericRecord, LongWritable, Text> {
+    protected void map(Void key, GenericRecord value, Context context) throws IOException ,InterruptedException {
+      context.write(null, new Text(value.toString()));
+    }
+
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+
+    final Configuration conf = new Configuration();
+    final Path inputPath = new Path("src/test/java/org/apache/parquet/avro/TestInputOutputFormat.java");
+    final Path parquetPath = new Path("target/test/hadoop/TestInputOutputFormat/parquet");
+    final Path outputPath = new Path("target/test/hadoop/TestInputOutputFormat/out");
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      final Job job = new Job(conf, "write");
+
+      // input not really used
+      TextInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(TextInputFormat.class);
+
+      job.setMapperClass(TestInputOutputFormat.MyMapper.class);
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(AvroParquetOutputFormat.class);
+      AvroParquetOutputFormat.setOutputPath(job, parquetPath);
+      AvroParquetOutputFormat.setSchema(job, avroSchema);
+
+      waitForJob(job);
+    }
+    {
+      final Job job = new Job(conf, "read");
+      job.setInputFormatClass(AvroParquetInputFormat.class);
+      AvroParquetInputFormat.setInputPaths(job, parquetPath);
+
+      job.setMapperClass(TestInputOutputFormat.MyMapper2.class);
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(TextOutputFormat.class);
+      TextOutputFormat.setOutputPath(job, outputPath);
+
+      waitForJob(job);
+    }
+
+    final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
+    String lineOut = null;
+    int lineNumber = 0;
+    while ((lineOut = out.readLine()) != null) {
+      lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
+      GenericRecord a = nextRecord(lineNumber == 4 ? null : lineNumber);
+      assertEquals("line " + lineNumber, a.toString(), lineOut);
+      ++ lineNumber;
+    }
+    assertNull("line " + lineNumber, out.readLine());
+    out.close();
+  }
+
+  private void waitForJob(Job job) throws Exception {
+    job.submit();
+    while (!job.isComplete()) {
+      LOG.debug("waiting for job " + job.getJobName());
+      sleep(100);
+    }
+    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
new file mode 100644
index 0000000..f7d00c6
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -0,0 +1,460 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Fixed;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.node.NullNode;
+import org.junit.Test;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestReadWrite {
+
+  @Test
+  public void testEmptyArray() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("array.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty array.
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("myarray", emptyArray).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyArray, nextRecord.get("myarray"));
+  }
+
+  @Test
+  public void testEmptyMap() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with an empty map.
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", emptyMap).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(emptyMap, nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testMapWithNulls() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map_with_nulls.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(map, nextRecord.get("mymap"));
+  }
+
+  @Test(expected=RuntimeException.class)
+  public void testMapRequiredValueWithNull() throws Exception {
+    Schema schema = Schema.createRecord("record1", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null)));
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer =
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a null value
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("thirty-four", 34);
+    map.put("eleventy-one", null);
+    map.put("one-hundred", 100);
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", map).build();
+    writer.write(record);
+  }
+
+  @Test
+  public void testMapWithUtf8Key() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("map.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    AvroParquetWriter<GenericRecord> writer = 
+        new AvroParquetWriter<GenericRecord>(file, schema);
+
+    // Write a record with a map with Utf8 keys.
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2))
+        .build();
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+  }
+
+  @Test
+  public void testAll() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("all.avsc").openStream());
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+    
+    AvroParquetWriter<GenericRecord> writer = new
+        AvroParquetWriter<GenericRecord>(file, schema);
+
+    GenericData.Record nestedRecord = new GenericRecordBuilder(
+        schema.getField("mynestedrecord").schema())
+            .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+    GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>(
+        Schema.createArray(Schema.create(Schema.Type.INT)), integerArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    List<Integer> emptyArray = new ArrayList<Integer>();
+    ImmutableMap emptyMap = new ImmutableMap.Builder<String, Integer>().build();
+
+    GenericData.Record record = new GenericRecordBuilder(schema)
+        .set("mynull", null)
+        .set("myboolean", true)
+        .set("myint", 1)
+        .set("mylong", 2L)
+        .set("myfloat", 3.1f)
+        .set("mydouble", 4.1)
+        .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+        .set("mystring", "hello")
+        .set("mynestedrecord", nestedRecord)
+        .set("myenum", "a")
+        .set("myarray", genericIntegerArray)
+        .set("myemptyarray", emptyArray)
+        .set("myoptionalarray", genericIntegerArray)
+        .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+        .set("myemptymap", emptyMap)
+        .set("myfixed", genericFixed)
+        .build();
+
+    writer.write(record);
+    writer.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertEquals(null, nextRecord.get("mynull"));
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals("hello", nextRecord.get("mystring"));
+    assertEquals("a", nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(emptyArray, nextRecord.get("myemptyarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(emptyMap, nextRecord.get("myemptymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+  }
+
+  @Test
+  public void testAllUsingDefaultAvroSchema() throws Exception {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path file = new Path(tmp.getPath());
+
+    // write file using Parquet APIs
+    ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<Map<String, Object>>(file,
+        new WriteSupport<Map<String, Object>>() {
+
+      private RecordConsumer recordConsumer;
+
+      @Override
+      public WriteContext init(Configuration configuration) {
+        return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
+            new HashMap<String, String>());
+      }
+
+      @Override
+      public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+      }
+
+      @Override
+      public void write(Map<String, Object> record) {
+        recordConsumer.startMessage();
+
+        int index = 0;
+
+        recordConsumer.startField("myboolean", index);
+        recordConsumer.addBoolean((Boolean) record.get("myboolean"));
+        recordConsumer.endField("myboolean", index++);
+
+        recordConsumer.startField("myint", index);
+        recordConsumer.addInteger((Integer) record.get("myint"));
+        recordConsumer.endField("myint", index++);
+
+        recordConsumer.startField("mylong", index);
+        recordConsumer.addLong((Long) record.get("mylong"));
+        recordConsumer.endField("mylong", index++);
+
+        recordConsumer.startField("myfloat", index);
+        recordConsumer.addFloat((Float) record.get("myfloat"));
+        recordConsumer.endField("myfloat", index++);
+
+        recordConsumer.startField("mydouble", index);
+        recordConsumer.addDouble((Double) record.get("mydouble"));
+        recordConsumer.endField("mydouble", index++);
+
+        recordConsumer.startField("mybytes", index);
+        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+        recordConsumer.endField("mybytes", index++);
+
+        recordConsumer.startField("mystring", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("mystring")));
+        recordConsumer.endField("mystring", index++);
+
+        recordConsumer.startField("mynestedrecord", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("mynestedint", 0);
+        recordConsumer.addInteger((Integer) record.get("mynestedint"));
+        recordConsumer.endField("mynestedint", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mynestedrecord", index++);
+
+        recordConsumer.startField("myenum", index);
+        recordConsumer.addBinary(Binary.fromString((String) record.get("myenum")));
+        recordConsumer.endField("myenum", index++);
+
+        recordConsumer.startField("myarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myarray", index++);
+
+        recordConsumer.startField("myoptionalarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        for (int val : (int[]) record.get("myoptionalarray")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myoptionalarray", index++);
+
+        recordConsumer.startField("myrecordarray", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("array", 0);
+        recordConsumer.startGroup();
+        recordConsumer.startField("a", 0);
+        for (int val : (int[]) record.get("myrecordarraya")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("a", 0);
+        recordConsumer.startField("b", 1);
+        for (int val : (int[]) record.get("myrecordarrayb")) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("b", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("array", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("myrecordarray", index++);
+
+        recordConsumer.startField("mymap", index);
+        recordConsumer.startGroup();
+        recordConsumer.startField("map", 0);
+        recordConsumer.startGroup();
+        Map<String, Integer> mymap = (Map<String, Integer>) record.get("mymap");
+        recordConsumer.startField("key", 0);
+        for (String key : mymap.keySet()) {
+          recordConsumer.addBinary(Binary.fromString(key));
+        }
+        recordConsumer.endField("key", 0);
+        recordConsumer.startField("value", 1);
+        for (int val : mymap.values()) {
+          recordConsumer.addInteger(val);
+        }
+        recordConsumer.endField("value", 1);
+        recordConsumer.endGroup();
+        recordConsumer.endField("map", 0);
+        recordConsumer.endGroup();
+        recordConsumer.endField("mymap", index++);
+
+        recordConsumer.startField("myfixed", index);
+        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+        recordConsumer.endField("myfixed", index++);
+
+        recordConsumer.endMessage();
+      }
+    });
+    Map<String, Object> record = new HashMap<String, Object>();
+    record.put("myboolean", true);
+    record.put("myint", 1);
+    record.put("mylong", 2L);
+    record.put("myfloat", 3.1f);
+    record.put("mydouble", 4.1);
+    record.put("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)));
+    record.put("mystring", "hello");
+    record.put("myenum", "a");
+    record.put("mynestedint", 1);
+    record.put("myarray", new int[] {1, 2, 3});
+    record.put("myoptionalarray", new int[]{1, 2, 3});
+    record.put("myrecordarraya", new int[] {1, 2, 3});
+    record.put("myrecordarrayb", new int[] {4, 5, 6});
+    record.put("mymap", ImmutableMap.of("a", 1, "b", 2));
+    record.put("myfixed", new byte[] { (byte) 65 });
+    parquetWriter.write(record);
+    parquetWriter.close();
+
+    Schema nestedRecordSchema = Schema.createRecord("mynestedrecord", null, null, false);
+    nestedRecordSchema.setFields(Arrays.asList(
+        new Schema.Field("mynestedint", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericData.Record nestedRecord = new GenericRecordBuilder(nestedRecordSchema)
+        .set("mynestedint", 1).build();
+
+    List<Integer> integerArray = Arrays.asList(1, 2, 3);
+
+    Schema recordArraySchema = Schema.createRecord("array", null, null, false);
+    recordArraySchema.setFields(Arrays.asList(
+        new Schema.Field("a", Schema.create(Schema.Type.INT), null, null),
+        new Schema.Field("b", Schema.create(Schema.Type.INT), null, null)
+    ));
+    GenericRecordBuilder builder = new GenericRecordBuilder(recordArraySchema);
+    List<GenericData.Record> recordArray = new ArrayList<GenericData.Record>();
+    recordArray.add(builder.set("a", 1).set("b", 4).build());
+    recordArray.add(builder.set("a", 2).set("b", 5).build());
+    recordArray.add(builder.set("a", 3).set("b", 6).build());
+    GenericData.Array<GenericData.Record> genericRecordArray = new GenericData.Array<GenericData.Record>(
+        Schema.createArray(recordArraySchema), recordArray);
+
+    GenericFixed genericFixed = new GenericData.Fixed(
+        Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    GenericRecord nextRecord = reader.read();
+    assertNotNull(nextRecord);
+    assertEquals(true, nextRecord.get("myboolean"));
+    assertEquals(1, nextRecord.get("myint"));
+    assertEquals(2L, nextRecord.get("mylong"));
+    assertEquals(3.1f, nextRecord.get("myfloat"));
+    assertEquals(4.1, nextRecord.get("mydouble"));
+    assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
+    assertEquals("hello", nextRecord.get("mystring"));
+    assertEquals("a", nextRecord.get("myenum"));
+    assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
+    assertEquals(integerArray, nextRecord.get("myarray"));
+    assertEquals(integerArray, nextRecord.get("myoptionalarray"));
+    assertEquals(genericRecordArray, nextRecord.get("myrecordarray"));
+    assertEquals(ImmutableMap.of("a", 1, "b", 2), nextRecord.get("mymap"));
+    assertEquals(genericFixed, nextRecord.get("myfixed"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java
new file mode 100644
index 0000000..17a0af1
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java
@@ -0,0 +1,286 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.filter.ColumnPredicates;
+import org.apache.parquet.filter.ColumnRecordFilter;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+
+public class TestSpecificInputOutputFormat {
+  private static final Log LOG = Log.getLog(TestSpecificInputOutputFormat.class);
+
+  public static Car nextRecord(int i) {
+    String vin = "1VXBR12EXCP000000";
+    Car.Builder carBuilder = Car.newBuilder()
+        .setDoors(2)
+        .setMake("Tesla")
+        .setModel(String.format("Model X v%d", i % 2))
+        .setVin(new Vin(vin.getBytes()))
+        .setYear(2014 + i)
+        .setOptionalExtra(LeatherTrim.newBuilder().setColour("black").build())
+        .setRegistration("California");
+    Engine.Builder engineBuilder = Engine.newBuilder()
+        .setCapacity(85.0f)
+        .setHasTurboCharger(false);
+    if (i % 2 == 0) {
+      engineBuilder.setType(EngineType.ELECTRIC);
+    } else {
+      engineBuilder.setType(EngineType.PETROL);
+    }
+    carBuilder.setEngine(engineBuilder.build());
+    if (i % 4 == 0) {
+      List<Service> serviceList = Lists.newArrayList();
+      serviceList.add(Service.newBuilder()
+          .setDate(1374084640)
+          .setMechanic("Elon Musk").build());
+      carBuilder.setServiceHistory(serviceList);
+    }
+    return carBuilder.build();
+  }
+
+  public static class MyMapper extends Mapper<LongWritable, Text, Void, Car> {
+
+    @Override
+    public void run(Context context) throws IOException ,InterruptedException {
+      for (int i = 0; i < 10; i++) {
+        context.write(null, nextRecord(i));
+      }
+    }
+  }
+
+  public static class MyMapper2 extends Mapper<Void, Car, Void, Car> {
+    @Override
+    protected void map(Void key, Car car, Context context) throws IOException ,InterruptedException {
+      // Note: Car can be null because of predicate pushdown defined by an UnboundedRecordFilter (see below)
+      if (car != null) {
+        context.write(null, car);
+      }
+    }
+
+  }
+
+  public static class MyMapperShort extends
+      Mapper<Void, ShortCar, Void, ShortCar> {
+    @Override
+    protected void map(Void key, ShortCar car, Context context)
+        throws IOException, InterruptedException {
+      // Note: Car can be null because of predicate pushdown defined by an
+      // UnboundedRecordFilter (see below)
+      if (car != null) {
+        context.write(null, car);
+      }
+    }
+
+  }
+
+  public static class ElectricCarFilter implements UnboundRecordFilter {
+
+    private final UnboundRecordFilter filter;
+
+    public ElectricCarFilter() {
+      filter = ColumnRecordFilter.column("engine.type", ColumnPredicates.equalTo(EngineType.ELECTRIC));
+    }
+
+    @Override
+    public RecordFilter bind(Iterable<ColumnReader> readers) {
+      return filter.bind(readers);
+    }
+  }
+
+  final Configuration conf = new Configuration();
+  final Path inputPath = new Path("src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java");
+  final Path parquetPath = new Path("target/test/hadoop/TestSpecificInputOutputFormat/parquet");
+  final Path outputPath = new Path("target/test/hadoop/TestSpecificInputOutputFormat/out");
+
+  @Before
+  public void createParquetFile() throws Exception {
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      final Job job = new Job(conf, "write");
+
+      // input not really used
+      TextInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(TextInputFormat.class);
+
+      job.setMapperClass(TestSpecificInputOutputFormat.MyMapper.class);
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(AvroParquetOutputFormat.class);
+      AvroParquetOutputFormat.setOutputPath(job, parquetPath);
+      AvroParquetOutputFormat.setSchema(job, Car.SCHEMA$);
+
+      waitForJob(job);
+    }
+  }
+  
+  @Test
+  public void testReadWrite() throws Exception {
+
+    final Job job = new Job(conf, "read");
+    job.setInputFormatClass(AvroParquetInputFormat.class);
+    AvroParquetInputFormat.setInputPaths(job, parquetPath);
+    // Test push-down predicates by using an electric car filter
+    AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
+
+    // Test schema projection by dropping the optional extras
+    Schema projection = Schema.createRecord(Car.SCHEMA$.getName(),
+        Car.SCHEMA$.getDoc(), Car.SCHEMA$.getNamespace(), false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (Schema.Field field : Car.SCHEMA$.getFields()) {
+      if (!"optionalExtra".equals(field.name())) {
+        fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
+            field.defaultValue(), field.order()));
+      }
+    }
+    projection.setFields(fields);
+    AvroParquetInputFormat.setRequestedProjection(job, projection);
+
+    job.setMapperClass(TestSpecificInputOutputFormat.MyMapper2.class);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AvroParquetOutputFormat.class);
+    AvroParquetOutputFormat.setOutputPath(job, outputPath);
+    AvroParquetOutputFormat.setSchema(job, Car.SCHEMA$);
+
+    waitForJob(job);
+
+    final Path mapperOutput = new Path(outputPath.toString(),
+        "part-m-00000.parquet");
+    final AvroParquetReader<Car> out = new AvroParquetReader<Car>(mapperOutput);
+    Car car;
+    Car previousCar = null;
+    int lineNumber = 0;
+    while ((car = out.read()) != null) {
+      if (previousCar != null) {
+         // Testing reference equality here. The "model" field should be dictionary-encoded.
+         assertTrue(car.getModel() == previousCar.getModel());
+      }
+      // Make sure that predicate push down worked as expected
+      if (car.getEngine().getType() == EngineType.PETROL) {
+        fail("UnboundRecordFilter failed to remove cars with PETROL engines");
+      }
+      // Note we use lineNumber * 2 because of predicate push down
+      Car expectedCar = nextRecord(lineNumber * 2);
+      // We removed the optional extra field using projection so we shouldn't
+      // see it here...
+      expectedCar.setOptionalExtra(null);
+      assertEquals("line " + lineNumber, expectedCar, car);
+      ++lineNumber;
+      previousCar = car;
+    }
+    out.close();
+  }
+
+  @Test
+  public void testReadWriteChangedCar() throws Exception {
+
+    final Job job = new Job(conf, "read changed/short");
+    job.setInputFormatClass(AvroParquetInputFormat.class);
+    AvroParquetInputFormat.setInputPaths(job, parquetPath);
+    // Test push-down predicates by using an electric car filter
+    AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
+
+    // Test schema projection by dropping the engine, year, and vin (like ShortCar),
+    // but making make optional (unlike ShortCar)
+    Schema projection = Schema.createRecord(Car.SCHEMA$.getName(),
+        Car.SCHEMA$.getDoc(), Car.SCHEMA$.getNamespace(), false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (Schema.Field field : Car.SCHEMA$.getFields()) {
+      // No make!
+      if ("engine".equals(field.name()) || "year".equals(field.name()) || "vin".equals(field.name())) {
+        fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
+            field.defaultValue(), field.order()));
+      }
+    }
+    projection.setFields(fields);
+    AvroParquetInputFormat.setRequestedProjection(job, projection);
+    AvroParquetInputFormat.setAvroReadSchema(job, ShortCar.SCHEMA$);
+
+    job.setMapperClass(TestSpecificInputOutputFormat.MyMapperShort.class);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AvroParquetOutputFormat.class);
+    AvroParquetOutputFormat.setOutputPath(job, outputPath);
+    AvroParquetOutputFormat.setSchema(job, ShortCar.SCHEMA$);
+
+    waitForJob(job);
+
+    final Path mapperOutput = new Path(outputPath.toString(), "part-m-00000.parquet");
+    final AvroParquetReader<ShortCar> out = new AvroParquetReader<ShortCar>(mapperOutput);
+    ShortCar car;
+    int lineNumber = 0;
+    while ((car = out.read()) != null) {
+      // Make sure that predicate push down worked as expected
+      // Note we use lineNumber * 2 because of predicate push down
+      Car expectedCar = nextRecord(lineNumber * 2);
+      // We removed the optional extra field using projection so we shouldn't see it here...
+      assertNull(car.getMake());
+      assertEquals(car.getEngine(), expectedCar.getEngine());
+      assertEquals(car.getYear(), expectedCar.getYear());
+      assertEquals(car.getVin(), expectedCar.getVin());
+      ++lineNumber;
+    }
+    out.close();
+  }
+
+  private void waitForJob(Job job) throws Exception {
+    job.submit();
+    while (!job.isComplete()) {
+      LOG.debug("waiting for job " + job.getJobName());
+      sleep(100);
+    }
+    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+
+  @After
+  public void deleteOutputFile() throws IOException {
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+  }
+}


Mime
View raw message