parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [47/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:44 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
new file mode 100644
index 0000000..f01f009
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -0,0 +1,288 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.apache.parquet.filter.ColumnPredicates.equalTo;
+import static org.apache.parquet.filter.ColumnRecordFilter.column;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * Other tests exercise the use of Avro Generic, a dynamic data representation. This class focuses
+ * on Avro Speific whose schemas are pre-compiled to POJOs with built in SerDe for faster serialization.
+ */
+public class TestSpecificReadWrite {
+
+  @Test
+  public void testReadWriteSpecific() throws IOException {
+    Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path);
+    for (int i = 0; i < 10; i++) {
+      assertEquals(getVwPolo().toString(), reader.read().toString());
+      assertEquals(getVwPassat().toString(), reader.read().toString());
+      assertEquals(getBmwMini().toString(), reader.read().toString());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testReadWriteSpecificWithDictionary() throws IOException {
+    Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, true);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path);
+    for (int i = 0; i < 10; i++) {
+      assertEquals(getVwPolo().toString(), reader.read().toString());
+      assertEquals(getVwPassat().toString(), reader.read().toString());
+      assertEquals(getBmwMini().toString(), reader.read().toString());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesMultiple() throws IOException {
+    Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(getVwPolo().toString(), reader.read().toString());
+      assertEquals(getVwPassat().toString(), reader.read().toString());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesMultipleBlocks() throws IOException {
+    Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
+    for (int i = 0; i < 10000; i++) {
+      assertEquals(getVwPolo().toString(), reader.read().toString());
+      assertEquals(getVwPassat().toString(), reader.read().toString());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesNoBlocks() throws IOException {
+    Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Bogus")));
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesFinalBlockOnly() throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path path = new Path(tmp.getPath());
+
+    Car vwPolo   = getVwPolo();
+    Car vwPassat = getVwPassat();
+    Car bmwMini  = getBmwMini();
+
+    ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$,
+        CompressionCodecName.UNCOMPRESSED, DEFAULT_BLOCK_SIZE/128, DEFAULT_PAGE_SIZE/128,
+        false);
+    for (int i = 0; i < 10000; i++) {
+      writer.write(vwPolo);
+      writer.write(vwPassat);
+      writer.write(vwPolo);
+    }
+    writer.write(bmwMini); // only write BMW in last block
+    writer.close();
+
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
+        equalTo("BMW")));
+    assertEquals(getBmwMini().toString(), reader.read().toString());
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterWithDictionary() throws IOException {
+    Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
+    assertEquals(getVwPolo().toString(), reader.read().toString());
+    assertEquals(getVwPassat().toString(), reader.read().toString());
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterOnSubAttribute() throws IOException {
+    Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
+    
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("engine.type", equalTo(EngineType.DIESEL)));
+    assertEquals(reader.read().toString(), getVwPassat().toString());
+    assertNull(reader.read());
+
+    reader = new AvroParquetReader<Car>(path, column("engine.capacity", equalTo(1.4f)));
+    assertEquals(getVwPolo().toString(), reader.read().toString());
+    assertNull(reader.read());
+
+    reader = new AvroParquetReader<Car>(path, column("engine.hasTurboCharger", equalTo(true)));
+    assertEquals(getBmwMini().toString(), reader.read().toString());
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testProjection() throws IOException {
+    Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
+    Configuration conf = new Configuration();
+
+    Schema schema = Car.getClassSchema();
+    List<Schema.Field> fields = schema.getFields();
+
+    //Schema.Parser parser = new Schema.Parser();
+    List<Schema.Field> projectedFields = new ArrayList<Schema.Field>();
+    for (Schema.Field field : fields) {
+      String name = field.name();
+      if ("optionalExtra".equals(name) ||
+          "serviceHistory".equals(name)) {
+        continue;
+      }
+
+      //Schema schemaClone = parser.parse(field.schema().toString(false));
+      Schema.Field fieldClone = new Schema.Field(name, field.schema(), field.doc(), field.defaultValue());
+      projectedFields.add(fieldClone);
+    }
+
+    Schema projectedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
+    projectedSchema.setFields(projectedFields);
+    AvroReadSupport.setRequestedProjection(conf, projectedSchema);
+
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(conf, path);
+    for (Car car = reader.read(); car != null; car = reader.read()) {
+      assertEquals(car.getDoors() != null, true);
+      assertEquals(car.getEngine() != null, true);
+      assertEquals(car.getMake() != null, true);
+      assertEquals(car.getModel() != null, true);
+      assertEquals(car.getYear() != null, true);
+      assertEquals(car.getVin() != null, true);
+      assertNull(car.getOptionalExtra());
+      assertNull(car.getServiceHistory());
+    }
+  }
+
+  @Test
+  public void testAvroReadSchema() throws IOException {
+    Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
+    Configuration conf = new Configuration();
+    AvroReadSupport.setAvroReadSchema(conf, NewCar.SCHEMA$);
+
+    ParquetReader<NewCar> reader = new AvroParquetReader<NewCar>(conf, path);
+    for (NewCar car = reader.read(); car != null; car = reader.read()) {
+      assertEquals(car.getEngine() != null, true);
+      assertEquals(car.getBrand() != null, true);
+      assertEquals(car.getYear() != null, true);
+      assertEquals(car.getVin() != null, true);
+      assertEquals(car.getDescription() == null, true);
+      assertEquals(car.getOpt() == 5, true);
+    }
+  }
+
+  private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException {
+    return writeCarsToParquetFile(num, compression, enableDictionary, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary, int blockSize, int pageSize) throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path path = new Path(tmp.getPath());
+
+    Car vwPolo   = getVwPolo();
+    Car vwPassat = getVwPassat();
+    Car bmwMini  = getBmwMini();
+
+    ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path,Car.SCHEMA$, compression,
+        blockSize, pageSize, enableDictionary);
+    for (int i = 0; i < num; i++) {
+      writer.write(vwPolo);
+      writer.write(vwPassat);
+      writer.write(bmwMini);
+    }
+    writer.close();
+    return path;
+  }
+
+  public static Car getVwPolo() {
+    String vin = "WVWDB4505LK000001";
+    return Car.newBuilder()
+        .setYear(2010)
+        .setRegistration("A123 GTR")
+        .setMake("Volkswagen")
+        .setModel("Polo")
+        .setVin(new Vin(vin.getBytes()))
+        .setDoors(4)
+        .setEngine(Engine.newBuilder().setType(EngineType.PETROL)
+                  .setCapacity(1.4f).setHasTurboCharger(false).build())
+        .setOptionalExtra(
+            Stereo.newBuilder().setMake("Blaupunkt").setSpeakers(4).build())
+        .setServiceHistory(ImmutableList.of(
+            Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build(),
+            Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build()))
+        .build();
+  }
+
+  public static Car getVwPassat() {
+    String vin = "WVWDB4505LK000002";
+    return Car.newBuilder()
+        .setYear(2010)
+        .setRegistration("A123 GXR")
+        .setMake("Volkswagen")
+        .setModel("Passat")
+        .setVin(new Vin(vin.getBytes()))
+        .setDoors(5)
+        .setEngine(Engine.newBuilder().setType(EngineType.DIESEL)
+            .setCapacity(2.0f).setHasTurboCharger(false).build())
+        .setOptionalExtra(
+            LeatherTrim.newBuilder().setColour("Black").build())
+        .setServiceHistory(ImmutableList.of(
+            Service.newBuilder().setDate(1325376000l).setMechanic("Jim").build()))
+        .build();
+  }
+
+  public static Car getBmwMini() {
+    String vin = "WBABA91060AL00003";
+    return Car.newBuilder()
+        .setYear(2010)
+        .setRegistration("A124 GSR")
+        .setMake("BMW")
+        .setModel("Mini")
+        .setVin(new Vin(vin.getBytes()))
+        .setDoors(4)
+        .setEngine(Engine.newBuilder().setType(EngineType.PETROL)
+            .setCapacity(1.6f).setHasTurboCharger(true).build())
+        .setOptionalExtra(null)
+        .setServiceHistory(ImmutableList.of(
+            Service.newBuilder().setDate(1356998400l).setMechanic("Mike").build()))
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
deleted file mode 100644
index aba5ef3..0000000
--- a/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import com.google.common.collect.Lists;
-import java.util.Arrays;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.codehaus.jackson.node.NullNode;
-
-public class AvroTestUtil {
-
-  public static Schema record(String name, Schema.Field... fields) {
-    Schema record = Schema.createRecord(name, null, null, false);
-    record.setFields(Arrays.asList(fields));
-    return record;
-  }
-
-  public static Schema.Field field(String name, Schema schema) {
-    return new Schema.Field(name, schema, null, null);
-  }
-
-  public static Schema.Field optionalField(String name, Schema schema) {
-    return new Schema.Field(name, optional(schema), null, NullNode.getInstance());
-  }
-
-  public static Schema array(Schema element) {
-    return Schema.createArray(element);
-  }
-
-  public static Schema primitive(Schema.Type type) {
-    return Schema.create(type);
-  }
-
-  public static Schema optional(Schema original) {
-    return Schema.createUnion(Lists.newArrayList(
-        Schema.create(Schema.Type.NULL),
-        original));
-  }
-
-  public static GenericRecord instance(Schema schema, Object... pairs) {
-    if ((pairs.length % 2) != 0) {
-      throw new RuntimeException("Not enough values");
-    }
-    GenericRecord record = new GenericData.Record(schema);
-    for (int i = 0; i < pairs.length; i += 2) {
-      record.put(pairs[i].toString(), pairs[i + 1]);
-    }
-    return record;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
deleted file mode 100644
index 62beed2..0000000
--- a/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
+++ /dev/null
@@ -1,999 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import static parquet.avro.AvroTestUtil.array;
-import static parquet.avro.AvroTestUtil.field;
-import static parquet.avro.AvroTestUtil.instance;
-import static parquet.avro.AvroTestUtil.optional;
-import static parquet.avro.AvroTestUtil.optionalField;
-import static parquet.avro.AvroTestUtil.primitive;
-import static parquet.avro.AvroTestUtil.record;
-
-public class TestArrayCompatibility {
-
-  @Rule
-  public final TemporaryFolder tempDir = new TemporaryFolder();
-
-  public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
-
-  @BeforeClass
-  public static void setupNewBehaviorConfiguration() {
-    NEW_BEHAVIOR_CONF.setBoolean(
-        AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
-  }
-
-  @Test
-  @Ignore(value="Not yet supported")
-  public void testUnannotatedListOfPrimitives() throws Exception {
-    Path test = writeDirect(
-        "message UnannotatedListOfPrimitives {" +
-            "  repeated int32 list_of_ints;" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("list_of_ints", 0);
-
-            rc.addInteger(34);
-            rc.addInteger(35);
-            rc.addInteger(36);
-
-            rc.endField("list_of_ints", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema expectedSchema = record("OldPrimitiveInList",
-        field("list_of_ints", array(primitive(Schema.Type.INT))));
-
-    GenericRecord expectedRecord = instance(expectedSchema,
-        "list_of_ints", Arrays.asList(34, 35, 36));
-
-    // both should behave the same way
-    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
-    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
-  }
-
-  @Test
-  @Ignore(value="Not yet supported")
-  public void testUnannotatedListOfGroups() throws Exception {
-    Path test = writeDirect(
-        "message UnannotatedListOfGroups {" +
-            "  repeated group list_of_points {" +
-            "    required float x;" +
-            "    required float y;" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("list_of_points", 0);
-
-            rc.startGroup();
-            rc.startField("x", 0);
-            rc.addFloat(1.0f);
-            rc.endField("x", 0);
-            rc.startField("y", 1);
-            rc.addFloat(1.0f);
-            rc.endField("y", 1);
-            rc.endGroup();
-
-            rc.startGroup();
-            rc.startField("x", 0);
-            rc.addFloat(2.0f);
-            rc.endField("x", 0);
-            rc.startField("y", 1);
-            rc.addFloat(2.0f);
-            rc.endField("y", 1);
-            rc.endGroup();
-
-            rc.endField("list_of_points", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema point = record("?",
-        field("x", primitive(Schema.Type.FLOAT)),
-        field("y", primitive(Schema.Type.FLOAT)));
-    Schema expectedSchema = record("OldPrimitiveInList",
-        field("list_of_points", array(point)));
-
-    GenericRecord expectedRecord = instance(expectedSchema,
-        "list_of_points", Arrays.asList(
-            instance(point, "x", 1.0f, "y", 1.0f),
-            instance(point, "x", 2.0f, "y", 2.0f)));
-
-    // both should behave the same way
-    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
-    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
-  }
-
-  @Test
-  public void testRepeatedPrimitiveInList() throws Exception {
-    Path test = writeDirect(
-        "message RepeatedPrimitiveInList {" +
-            "  required group list_of_ints (LIST) {" +
-            "    repeated int32 array;" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("list_of_ints", 0);
-
-            rc.startGroup();
-            rc.startField("array", 0);
-
-            rc.addInteger(34);
-            rc.addInteger(35);
-            rc.addInteger(36);
-
-            rc.endField("array", 0);
-            rc.endGroup();
-
-            rc.endField("list_of_ints", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema expectedSchema = record("RepeatedPrimitiveInList",
-        field("list_of_ints", array(Schema.create(Schema.Type.INT))));
-
-    GenericRecord expectedRecord = instance(expectedSchema,
-        "list_of_ints", Arrays.asList(34, 35, 36));
-
-    // both should behave the same way
-    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
-    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
-  }
-
-  @Test
-  public void testMultiFieldGroupInList() throws Exception {
-    // tests the missing element layer, detected by a multi-field group
-    Path test = writeDirect(
-        "message MultiFieldGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group element {" +
-            "      required double latitude;" +
-            "      required double longitude;" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-    Schema expectedSchema = record("MultiFieldGroupInList",
-        optionalField("locations", array(location)));
-
-    GenericRecord expectedRecord = instance(expectedSchema,
-        "locations", Arrays.asList(
-            instance(location, "latitude", 0.0, "longitude", 0.0),
-            instance(location, "latitude", 0.0, "longitude", 180.0)));
-
-    // both should behave the same way
-    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
-    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
-  }
-
-  @Test
-  public void testSingleFieldGroupInList() throws Exception {
-    // this tests the case where non-avro older data has an ambiguous list
-    Path test = writeDirect(
-        "message SingleFieldGroupInList {" +
-            "  optional group single_element_groups (LIST) {" +
-            "    repeated group single_element_group {" +
-            "      required int64 count;" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("single_element_groups", 0);
-
-            rc.startGroup();
-            rc.startField("single_element_group", 0); // start writing array contents
-
-            rc.startGroup();
-            rc.startField("count", 0);
-            rc.addLong(1234L);
-            rc.endField("count", 0);
-            rc.endGroup();
-
-            rc.startGroup();
-            rc.startField("count", 0);
-            rc.addLong(2345L);
-            rc.endField("count", 0);
-            rc.endGroup();
-
-            rc.endField("single_element_group", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("single_element_groups", 0);
-            rc.endMessage();
-          }
-        });
-
-    // can't tell from storage whether this should be a list of single-field
-    // records or if the single_field_group layer is synthetic.
-
-    // old behavior - assume that the repeated type is the element type
-    Schema singleElementGroupSchema = record("single_element_group",
-        field("count", primitive(Schema.Type.LONG)));
-    Schema oldSchema = record("SingleFieldGroupInList",
-        optionalField("single_element_groups", array(singleElementGroupSchema)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "single_element_groups", Arrays.asList(
-            instance(singleElementGroupSchema, "count", 1234L),
-            instance(singleElementGroupSchema, "count", 2345L)));
-
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-
-    // new behavior - assume that single_element_group is synthetic (in spec)
-    Schema newSchema = record("SingleFieldGroupInList",
-        optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
-    GenericRecord newRecord = instance(newSchema,
-        "single_element_groups", Arrays.asList(1234L, 2345L));
-
-    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
-  }
-
-  @Test
-  public void testSingleFieldGroupInListWithSchema() throws Exception {
-    // this tests the case where older data has an ambiguous structure, but the
-    // correct interpretation can be determined from the avro schema
-
-    Schema singleElementRecord = record("single_element_group",
-        field("count", primitive(Schema.Type.LONG)));
-
-    Schema expectedSchema = record("SingleFieldGroupInList",
-        optionalField("single_element_groups",
-            array(singleElementRecord)));
-
-    Map<String, String> metadata = new HashMap<String, String>();
-    metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());
-
-    Path test = writeDirect(
-        "message SingleFieldGroupInList {" +
-            "  optional group single_element_groups (LIST) {" +
-            "    repeated group single_element_group {" +
-            "      required int64 count;" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("single_element_groups", 0);
-
-            rc.startGroup();
-            rc.startField("single_element_group", 0); // start writing array contents
-
-            rc.startGroup();
-            rc.startField("count", 0);
-            rc.addLong(1234L);
-            rc.endField("count", 0);
-            rc.endGroup();
-
-            rc.startGroup();
-            rc.startField("count", 0);
-            rc.addLong(2345L);
-            rc.endField("count", 0);
-            rc.endGroup();
-
-            rc.endField("single_element_group", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("single_element_groups", 0);
-            rc.endMessage();
-          }
-        },
-        metadata);
-
-    GenericRecord expectedRecord = instance(expectedSchema,
-        "single_element_groups", Arrays.asList(
-            instance(singleElementRecord, "count", 1234L),
-            instance(singleElementRecord, "count", 2345L)));
-
-    // both should behave the same way because the schema is present
-    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
-    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
-  }
-
-  @Test
-  public void testNewOptionalGroupInList() throws Exception {
-    Path test = writeDirect(
-        "message NewOptionalGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group list {" +
-            "      optional group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("list", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a null element (element field is omitted)
-            rc.startGroup(); // array level
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("list", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    // old behavior - assume that the repeated type is the element type
-    Schema elementRecord = record("list", optionalField("element", location));
-    Schema oldSchema = record("NewOptionalGroupInList",
-        optionalField("locations", array(elementRecord)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "locations", Arrays.asList(
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 0.0)),
-            instance(elementRecord),
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 180.0))));
-
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-
-    // new behavior - assume that single_element_group is synthetic (in spec)
-    Schema newSchema = record("NewOptionalGroupInList",
-        optionalField("locations", array(optional(location))));
-    GenericRecord newRecord = instance(newSchema,
-        "locations", Arrays.asList(
-            instance(location, "latitude", 0.0, "longitude", 0.0),
-            null,
-            instance(location, "latitude", 0.0, "longitude", 180.0)));
-
-    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
-  }
-
-  @Test
-  public void testNewRequiredGroupInList() throws Exception {
-    Path test = writeDirect(
-        "message NewRequiredGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group list {" +
-            "      required group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("list", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("list", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    // old behavior - assume that the repeated type is the element type
-    Schema elementRecord = record("list", field("element", location));
-    Schema oldSchema = record("NewRequiredGroupInList",
-        optionalField("locations", array(elementRecord)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "locations", Arrays.asList(
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 180.0)),
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 0.0))));
-
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-
-    // new behavior - assume that single_element_group is synthetic (in spec)
-    Schema newSchema = record("NewRequiredGroupInList",
-        optionalField("locations", array(location)));
-    GenericRecord newRecord = instance(newSchema,
-        "locations", Arrays.asList(
-            instance(location, "latitude", 0.0, "longitude", 180.0),
-            instance(location, "latitude", 0.0, "longitude", 0.0)));
-
-    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
-  }
-
-  @Test
-  public void testAvroCompatRequiredGroupInList() throws Exception {
-    Path test = writeDirect(
-        "message AvroCompatRequiredGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group array {" +
-            "      optional group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("array", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("array", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    // old behavior - assume that the repeated type is the element type
-    Schema elementRecord = record("array", optionalField("element", location));
-    Schema oldSchema = record("AvroCompatRequiredGroupInList",
-        optionalField("locations", array(elementRecord)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "locations", Arrays.asList(
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 180.0)),
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 0.0))));
-
-    // both should detect the "array" name
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
-  }
-
-  @Test
-  public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
-    Path test = writeDirect(
-        "message AvroCompatRequiredGroupInListWithSchema {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group array {" +
-            "      optional group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("array", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("array", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    Schema newSchema = record("HiveCompatOptionalGroupInList",
-        optionalField("locations", array(optional(location))));
-    GenericRecord newRecord = instance(newSchema,
-        "locations", Arrays.asList(
-            instance(location, "latitude", 0.0, "longitude", 180.0),
-            instance(location, "latitude", 0.0, "longitude", 0.0)));
-
-    Configuration oldConfWithSchema = new Configuration();
-    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
-
-    // both should use the schema structure that is provided
-    assertReaderContains(
-        new AvroParquetReader<GenericRecord>(oldConfWithSchema, test),
-        newSchema, newRecord);
-
-    Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
-    AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);
-
-    assertReaderContains(
-        new AvroParquetReader<GenericRecord>(newConfWithSchema, test),
-        newSchema, newRecord);
-  }
-
-  @Test
-  public void testThriftCompatRequiredGroupInList() throws Exception {
-    Path test = writeDirect(
-        "message ThriftCompatRequiredGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group locations_tuple {" +
-            "      optional group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("locations_tuple", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("locations_tuple", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    // old behavior - assume that the repeated type is the element type
-    Schema elementRecord = record("locations_tuple", optionalField("element", location));
-    Schema oldSchema = record("ThriftCompatRequiredGroupInList",
-        optionalField("locations", array(elementRecord)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "locations", Arrays.asList(
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 180.0)),
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 0.0))));
-
-    // both should detect the "array" name
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
-  }
-
-  @Test
-  public void testHiveCompatOptionalGroupInList() throws Exception {
-    Path test = writeDirect(
-        "message HiveCompatOptionalGroupInList {" +
-            "  optional group locations (LIST) {" +
-            "    repeated group bag {" +
-            "      optional group element {" +
-            "        required double latitude;" +
-            "        required double longitude;" +
-            "      }" +
-            "    }" +
-            "  }" +
-            "}",
-        new DirectWriter() {
-          @Override
-          public void write(RecordConsumer rc) {
-            rc.startMessage();
-            rc.startField("locations", 0);
-
-            rc.startGroup();
-            rc.startField("bag", 0); // start writing array contents
-
-            // write a non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(180.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            // write a second non-null element
-            rc.startGroup(); // array level
-            rc.startField("element", 0);
-
-            rc.startGroup();
-            rc.startField("latitude", 0);
-            rc.addDouble(0.0);
-            rc.endField("latitude", 0);
-            rc.startField("longitude", 1);
-            rc.addDouble(0.0);
-            rc.endField("longitude", 1);
-            rc.endGroup();
-
-            rc.endField("element", 0);
-            rc.endGroup(); // array level
-
-            rc.endField("bag", 0); // finished writing array contents
-            rc.endGroup();
-
-            rc.endField("locations", 0);
-            rc.endMessage();
-          }
-        });
-
-    Schema location = record("element",
-        field("latitude", primitive(Schema.Type.DOUBLE)),
-        field("longitude", primitive(Schema.Type.DOUBLE)));
-
-    // old behavior - assume that the repeated type is the element type
-    Schema elementRecord = record("bag", optionalField("element", location));
-    Schema oldSchema = record("HiveCompatOptionalGroupInList",
-        optionalField("locations", array(elementRecord)));
-    GenericRecord oldRecord = instance(oldSchema,
-        "locations", Arrays.asList(
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 180.0)),
-            instance(elementRecord, "element",
-                instance(location, "latitude", 0.0, "longitude", 0.0))));
-
-    // both should detect the "array" name
-    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
-
-    Schema newSchema = record("HiveCompatOptionalGroupInList",
-        optionalField("locations", array(optional(location))));
-    GenericRecord newRecord = instance(newSchema,
-        "locations", Arrays.asList(
-            instance(location, "latitude", 0.0, "longitude", 180.0),
-            instance(location, "latitude", 0.0, "longitude", 0.0)));
-
-    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
-  }
-
-  private interface DirectWriter {
-    public void write(RecordConsumer consumer);
-  }
-
-  private static class DirectWriteSupport extends WriteSupport<Void> {
-    private RecordConsumer recordConsumer;
-    private final MessageType type;
-    private final DirectWriter writer;
-    private final Map<String, String> metadata;
-
-    private DirectWriteSupport(MessageType type, DirectWriter writer,
-                               Map<String, String> metadata) {
-      this.type = type;
-      this.writer = writer;
-      this.metadata = metadata;
-    }
-
-    @Override
-    public WriteContext init(Configuration configuration) {
-      return new WriteContext(type, metadata);
-    }
-
-    @Override
-    public void prepareForWrite(RecordConsumer recordConsumer) {
-      this.recordConsumer = recordConsumer;
-    }
-
-    @Override
-    public void write(Void record) {
-      writer.write(recordConsumer);
-    }
-  }
-
-  private Path writeDirect(String type, DirectWriter writer) throws IOException {
-    return writeDirect(MessageTypeParser.parseMessageType(type), writer);
-  }
-
-  private Path writeDirect(String type, DirectWriter writer,
-                           Map<String, String> metadata) throws IOException {
-    return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
-  }
-
-  private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
-    return writeDirect(type, writer, new HashMap<String, String>());
-  }
-
-  private Path writeDirect(MessageType type, DirectWriter writer,
-                           Map<String, String> metadata) throws IOException {
-    File temp = tempDir.newFile(UUID.randomUUID().toString());
-    temp.deleteOnExit();
-    temp.delete();
-
-    Path path = new Path(temp.getPath());
-
-    ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
-        path, new DirectWriteSupport(type, writer, metadata));
-    parquetWriter.write(null);
-    parquetWriter.close();
-
-    return path;
-  }
-
-  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
-      Path path) throws IOException {
-    return new AvroParquetReader<T>(path);
-  }
-
-  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
-      Path path) throws IOException {
-    return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
-  }
-
-  public <T extends IndexedRecord> void assertReaderContains(
-      AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
-      throws IOException {
-    for (T expectedRecord : expectedRecords) {
-      T actualRecord = reader.read();
-      Assert.assertEquals("Should match expected schema",
-          expectedSchema, actualRecord.getSchema());
-      Assert.assertEquals("Should match the expected record",
-          expectedRecord, actualRecord);
-    }
-    Assert.assertNull("Should only contain " + expectedRecords.length +
-            " record" + (expectedRecords.length == 1 ? "" : "s"),
-        reader.read());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/TestAvroDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroDataSupplier.java b/parquet-avro/src/test/java/parquet/avro/TestAvroDataSupplier.java
deleted file mode 100644
index c01def9..0000000
--- a/parquet-avro/src/test/java/parquet/avro/TestAvroDataSupplier.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestAvroDataSupplier {
-
-  public static class GenericDataSupplier implements AvroDataSupplier {
-    @Override
-    public GenericData get() {
-      return GenericData.get();
-    }
-  }
-
-  @Test
-  public void testSetSupplierMethod() {
-    Configuration conf = new Configuration(false);
-    AvroReadSupport.setAvroDataSupplier(conf, GenericDataSupplier.class);
-    Assert.assertEquals("Should contain the class name",
-        "parquet.avro.TestAvroDataSupplier$GenericDataSupplier",
-        conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER));
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
deleted file mode 100644
index efdc9ed..0000000
--- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Resources;
-import java.util.Arrays;
-import org.apache.avro.Schema;
-import org.codehaus.jackson.node.NullNode;
-import org.junit.Test;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestAvroSchemaConverter {
-
-    public static final String ALL_PARQUET_SCHEMA =
-      "message parquet.avro.myrecord {\n" +
-      "  required boolean myboolean;\n" +
-      "  required int32 myint;\n" +
-      "  required int64 mylong;\n" +
-      "  required float myfloat;\n" +
-      "  required double mydouble;\n" +
-      "  required binary mybytes;\n" +
-      "  required binary mystring (UTF8);\n" +
-      "  required group mynestedrecord {\n" +
-      "    required int32 mynestedint;\n" +
-      "  }\n" +
-      "  required binary myenum (ENUM);\n" +
-      "  required group myarray (LIST) {\n" +
-      "    repeated int32 array;\n" +
-      "  }\n" +
-      "  optional group myoptionalarray (LIST) {\n" +
-      "    repeated int32 array;\n" +
-      "  }\n" +
-      "  required group myrecordarray (LIST) {\n" +
-      "    repeated group array {\n" +
-      "      required int32 a;\n" +
-      "      required int32 b;\n" +
-      "    }\n" +
-      "  }\n" +
-      "  required group mymap (MAP) {\n" +
-      "    repeated group map (MAP_KEY_VALUE) {\n" +
-      "      required binary key (UTF8);\n" +
-      "      required int32 value;\n" +
-      "    }\n" +
-      "  }\n" +
-      "  required fixed_len_byte_array(1) myfixed;\n" +
-      "}\n";
-
-  private void testAvroToParquetConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-    MessageType schema = avroSchemaConverter.convert(avroSchema);
-    MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
-    assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
-        schema.toString());
-  }
-
-  private void testParquetToAvroConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-    Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType
-        (schemaString));
-    assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(),
-        schema.toString());
-  }
-
-  private void testRoundTripConversion(Schema avroSchema, String schemaString) throws
-      Exception {
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-    MessageType schema = avroSchemaConverter.convert(avroSchema);
-    MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
-    assertEquals("converting " + schema + " to " + schemaString, expectedMT.toString(),
-        schema.toString());
-    Schema convertedAvroSchema = avroSchemaConverter.convert(expectedMT);
-    assertEquals("converting " + expectedMT + " to " + avroSchema.toString(true),
-        avroSchema.toString(), convertedAvroSchema.toString());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testTopLevelMustBeARecord() {
-    new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
-  }
-
-  @Test
-  public void testAllTypes() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("all.avsc").openStream());
-    testAvroToParquetConversion(
-        schema,
-        "message parquet.avro.myrecord {\n" +
-            // Avro nulls are not encoded, unless they are null unions
-            "  required boolean myboolean;\n" +
-            "  required int32 myint;\n" +
-            "  required int64 mylong;\n" +
-            "  required float myfloat;\n" +
-            "  required double mydouble;\n" +
-            "  required binary mybytes;\n" +
-            "  required binary mystring (UTF8);\n" +
-            "  required group mynestedrecord {\n" +
-            "    required int32 mynestedint;\n" +
-            "  }\n" +
-            "  required binary myenum (ENUM);\n" +
-            "  required group myarray (LIST) {\n" +
-            "    repeated int32 array;\n" +
-            "  }\n" +
-            "  required group myemptyarray (LIST) {\n" +
-            "    repeated int32 array;\n" +
-            "  }\n" +
-            "  optional group myoptionalarray (LIST) {\n" +
-            "    repeated int32 array;\n" +
-            "  }\n" +
-            "  required group mymap (MAP) {\n" +
-            "    repeated group map (MAP_KEY_VALUE) {\n" +
-            "      required binary key (UTF8);\n" +
-            "      required int32 value;\n" +
-            "    }\n" +
-            "  }\n" +
-            "  required group myemptymap (MAP) {\n" +
-            "    repeated group map (MAP_KEY_VALUE) {\n" +
-            "      required binary key (UTF8);\n" +
-            "      required int32 value;\n" +
-            "    }\n" +
-            "  }\n" +
-            "  required fixed_len_byte_array(1) myfixed;\n" +
-            "}\n");
-  }
-
-  @Test
-  public void testAllTypesParquetToAvro() throws Exception {
-    Schema schema = new Schema.Parser().parse(
-        Resources.getResource("allFromParquet.avsc").openStream());
-    // Cannot use round-trip assertion because enum is lost
-    testParquetToAvroConversion(schema, ALL_PARQUET_SCHEMA);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testParquetMapWithNonStringKeyFails() throws Exception {
-    MessageType parquetSchema = MessageTypeParser.parseMessageType(
-      "message myrecord {\n" +
-        "  required group mymap (MAP) {\n" +
-        "    repeated group map (MAP_KEY_VALUE) {\n" +
-        "      required int32 key;\n" +
-        "      required int32 value;\n" +
-        "    }\n" +
-        "  }\n" +
-        "}\n"
-    );
-    new AvroSchemaConverter().convert(parquetSchema);
-  }
-
-  @Test
-  public void testOptionalFields() throws Exception {
-    Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalInt = optional(Schema.create(Schema.Type.INT));
-    schema.setFields(Arrays.asList(
-        new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
-    ));
-    testRoundTripConversion(
-        schema,
-        "message record1 {\n" +
-            "  optional int32 myint;\n" +
-            "}\n");
-  }
-
-  @Test
-  public void testOptionalMapValue() throws Exception {
-    Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
-    schema.setFields(Arrays.asList(
-        new Schema.Field("myintmap", optionalIntMap, null, null)
-    ));
-    testRoundTripConversion(
-        schema,
-        "message record1 {\n" +
-            "  required group myintmap (MAP) {\n" +
-            "    repeated group map (MAP_KEY_VALUE) {\n" +
-            "      required binary key (UTF8);\n" +
-            "      optional int32 value;\n" +
-            "    }\n" +
-            "  }\n" +
-            "}\n");
-  }
-
-  @Test
-  public void testUnionOfTwoTypes() throws Exception {
-    Schema schema = Schema.createRecord("record2", null, null, false);
-    Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
-        .NULL),
-        Schema.create(Schema.Type.INT),
-        Schema.create(Schema.Type.FLOAT)));
-    schema.setFields(Arrays.asList(
-        new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
-
-    // Avro union is modelled using optional data members of the different
-    // types. This does not translate back into an Avro union
-    testAvroToParquetConversion(
-        schema,
-        "message record2 {\n" +
-            "  optional group myunion {\n" +
-            "    optional int32 member0;\n" +
-            "    optional float member1;\n" +
-            "  }\n" +
-            "}\n");
-  }
-
-  @Test
-  public void testArrayOfOptionalRecords() throws Exception {
-    Schema innerRecord = Schema.createRecord("InnerRecord", null, null, false);
-    Schema optionalString = optional(Schema.create(Schema.Type.STRING));
-    innerRecord.setFields(Lists.newArrayList(
-        new Schema.Field("s1", optionalString, null, NullNode.getInstance()),
-        new Schema.Field("s2", optionalString, null, NullNode.getInstance())
-    ));
-    Schema schema = Schema.createRecord("HasArray", null, null, false);
-    schema.setFields(Lists.newArrayList(
-        new Schema.Field("myarray", Schema.createArray(optional(innerRecord)),
-            null, NullNode.getInstance())
-    ));
-    System.err.println("Avro schema: " + schema.toString(true));
-
-    // Cannot use round-trip assertion because InnerRecord optional is removed
-    testAvroToParquetConversion(schema, "message HasArray {\n" +
-        "  required group myarray (LIST) {\n" +
-        "    repeated group array {\n" +
-        "      optional binary s1 (UTF8);\n" +
-        "      optional binary s2 (UTF8);\n" +
-        "    }\n" +
-        "  }\n" +
-        "}\n");
-  }
-
-  public static Schema optional(Schema original) {
-    return Schema.createUnion(Lists.newArrayList(
-        Schema.create(Schema.Type.NULL),
-        original));
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/TestBackwardCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestBackwardCompatibility.java b/parquet-avro/src/test/java/parquet/avro/TestBackwardCompatibility.java
deleted file mode 100644
index 4e614b5..0000000
--- a/parquet-avro/src/test/java/parquet/avro/TestBackwardCompatibility.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import com.google.common.io.Resources;
-import java.io.IOException;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-import parquet.hadoop.ParquetReader;
-
-public class TestBackwardCompatibility {
-
-  @Test
-  public void testStringCompatibility() throws IOException {
-    // some older versions of Parquet used avro.schema instead of
-    // parquet.avro.schema and didn't annotate binary with UTF8 when the type
-    // was converted from an Avro string. this validates that the old read
-    // schema is recognized and used to read the file as expected.
-    Path testFile = new Path(Resources.getResource("strings-2.parquet").getFile());
-    Configuration conf = new Configuration();
-    ParquetReader<GenericRecord> reader = AvroParquetReader
-        .builder(new AvroReadSupport<GenericRecord>(), testFile)
-        .withConf(conf)
-        .build();
-    GenericRecord r;
-    while ((r = reader.read()) != null) {
-      Assert.assertTrue("Should read value into a String",
-          r.get("text") instanceof String);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/test/java/parquet/avro/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestInputOutputFormat.java b/parquet-avro/src/test/java/parquet/avro/TestInputOutputFormat.java
deleted file mode 100644
index d00b93e..0000000
--- a/parquet-avro/src/test/java/parquet/avro/TestInputOutputFormat.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.Test;
-import parquet.Log;
-
-import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestInputOutputFormat {
-  private static final Log LOG = Log.getLog(TestInputOutputFormat.class);
-
-  private static Schema avroSchema;
-  static {
-    avroSchema = Schema.createRecord("record1", null, null, false);
-    avroSchema.setFields(
-        Arrays.asList(new Schema.Field("a",
-            Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL))),
-            null, null)));
-  }
-
-  public static GenericRecord nextRecord(Integer i) {
-    return new GenericRecordBuilder(avroSchema).set("a", i).build();
-  };
-
-  public static class MyMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
-
-    public void run(Context context) throws IOException ,InterruptedException {
-      for (int i = 0; i < 10; i++) {
-        GenericRecord a;
-        a = TestInputOutputFormat.nextRecord(i == 4 ? null : i);
-        context.write(null, a);
-      }
-    }
-  }
-
-  public static class MyMapper2 extends Mapper<Void, GenericRecord, LongWritable, Text> {
-    protected void map(Void key, GenericRecord value, Context context) throws IOException ,InterruptedException {
-      context.write(null, new Text(value.toString()));
-    }
-
-  }
-
-  @Test
-  public void testReadWrite() throws Exception {
-
-    final Configuration conf = new Configuration();
-    final Path inputPath = new Path("src/test/java/parquet/avro/TestInputOutputFormat.java");
-    final Path parquetPath = new Path("target/test/hadoop/TestInputOutputFormat/parquet");
-    final Path outputPath = new Path("target/test/hadoop/TestInputOutputFormat/out");
-    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
-    fileSystem.delete(parquetPath, true);
-    fileSystem.delete(outputPath, true);
-    {
-      final Job job = new Job(conf, "write");
-
-      // input not really used
-      TextInputFormat.addInputPath(job, inputPath);
-      job.setInputFormatClass(TextInputFormat.class);
-
-      job.setMapperClass(TestInputOutputFormat.MyMapper.class);
-      job.setNumReduceTasks(0);
-
-      job.setOutputFormatClass(AvroParquetOutputFormat.class);
-      AvroParquetOutputFormat.setOutputPath(job, parquetPath);
-      AvroParquetOutputFormat.setSchema(job, avroSchema);
-
-      waitForJob(job);
-    }
-    {
-      final Job job = new Job(conf, "read");
-      job.setInputFormatClass(AvroParquetInputFormat.class);
-      AvroParquetInputFormat.setInputPaths(job, parquetPath);
-
-      job.setMapperClass(TestInputOutputFormat.MyMapper2.class);
-      job.setNumReduceTasks(0);
-
-      job.setOutputFormatClass(TextOutputFormat.class);
-      TextOutputFormat.setOutputPath(job, outputPath);
-
-      waitForJob(job);
-    }
-
-    final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
-    String lineOut = null;
-    int lineNumber = 0;
-    while ((lineOut = out.readLine()) != null) {
-      lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
-      GenericRecord a = nextRecord(lineNumber == 4 ? null : lineNumber);
-      assertEquals("line " + lineNumber, a.toString(), lineOut);
-      ++ lineNumber;
-    }
-    assertNull("line " + lineNumber, out.readLine());
-    out.close();
-  }
-
-  private void waitForJob(Job job) throws Exception {
-    job.submit();
-    while (!job.isComplete()) {
-      LOG.debug("waiting for job " + job.getJobName());
-      sleep(100);
-    }
-    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
-    if (!job.isSuccessful()) {
-      throw new RuntimeException("job failed " + job.getJobName());
-    }
-  }
-
-}


Mime
View raw message