parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/2] parquet-mr git commit: PARQUET-243: Add Avro reflect support
Date Mon, 18 May 2015 17:08:48 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 60edcf9df -> a458e1a2f


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/GenericDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/GenericDataSupplier.java b/parquet-avro/src/main/java/org/apache/parquet/avro/GenericDataSupplier.java
new file mode 100644
index 0000000..873c594
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/GenericDataSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+
+public class GenericDataSupplier implements AvroDataSupplier {
+  @Override
+  public GenericData get() {
+    return GenericData.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
new file mode 100644
index 0000000..67b710d
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+abstract class ParentValueContainer {
+
+  /**
+   * Adds the value to the parent.
+   */
+  public void add(Object value) {
+    throw new RuntimeException(
+        "[BUG] ParentValueContainer#add was not overridden");
+  }
+
+  public void addBoolean(boolean value) {
+    add(value);
+  }
+
+  public void addByte(byte value) {
+    add(value);
+  }
+
+  public void addChar(char value) {
+    add(value);
+  }
+
+  public void addShort(short value) {
+    add(value);
+  }
+
+  public void addInt(int value) {
+    add(value);
+  }
+
+  public void addLong(long value) {
+    add(value);
+  }
+
+  public void addFloat(float value) {
+    add(value);
+  }
+
+  public void addDouble(double value) {
+    add(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectDataSupplier.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectDataSupplier.java
new file mode 100644
index 0000000..9c4cede
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectDataSupplier.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectData;
+
+public class ReflectDataSupplier implements AvroDataSupplier {
+  @Override
+  public GenericData get() {
+    return ReflectData.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
index aae11a7..d907bd4 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestBackwardCompatibility.java
@@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetReader;
 public class TestBackwardCompatibility {
 
   @Test
-  public void testStringCompatibility() throws IOException {
+  public void testCompatStringCompatibility() throws IOException {
     // some older versions of Parquet used avro.schema instead of
     // parquet.avro.schema and didn't annotate binary with UTF8 when the type
     // was converted from an Avro string. this validates that the old read
@@ -48,4 +48,20 @@ public class TestBackwardCompatibility {
     }
   }
 
+  @Test
+  public void testStringCompatibility() throws IOException {
+    Path testFile = new Path(Resources.getResource("strings-2.parquet").getFile());
+    Configuration conf = new Configuration();
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .builder(new AvroReadSupport<GenericRecord>(), testFile)
+        .withConf(conf)
+        .build();
+    GenericRecord r;
+    while ((r = reader.read()) != null) {
+      Assert.assertTrue("Should read value into a String",
+          r.get("text") instanceof String);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index f7d00c6..b558343 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -28,27 +28,43 @@ import java.util.*;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Fixed;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.node.NullNode;
 import org.junit.Test;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotNull;
 
+@RunWith(Parameterized.class)
 public class TestReadWrite {
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { false },  // use the new converters
+        { true } }; // use the old converters
+    return Arrays.asList(data);
+  }
+
+  private final boolean compat;
+  private final Configuration testConf = new Configuration(false);
+
+  public TestReadWrite(boolean compat) {
+    this.compat = compat;
+    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+  }
+
   @Test
   public void testEmptyArray() throws Exception {
     Schema schema = new Schema.Parser().parse(
@@ -59,7 +75,7 @@ public class TestReadWrite {
     tmp.delete();
     Path file = new Path(tmp.getPath());
 
-    AvroParquetWriter<GenericRecord> writer = 
+    AvroParquetWriter<GenericRecord> writer =
         new AvroParquetWriter<GenericRecord>(file, schema);
 
     // Write a record with an empty array.
@@ -69,7 +85,7 @@ public class TestReadWrite {
     writer.write(record);
     writer.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
@@ -96,7 +112,7 @@ public class TestReadWrite {
     writer.write(record);
     writer.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
@@ -127,7 +143,7 @@ public class TestReadWrite {
     writer.write(record);
     writer.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
@@ -179,7 +195,7 @@ public class TestReadWrite {
     writer.write(record);
     writer.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
 
     assertNotNull(nextRecord);
@@ -235,9 +251,12 @@ public class TestReadWrite {
     writer.write(record);
     writer.close();
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
 
+    Object expectedEnumSymbol = compat ? "a" :
+        new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a");
+
     assertNotNull(nextRecord);
     assertEquals(null, nextRecord.get("mynull"));
     assertEquals(true, nextRecord.get("myboolean"));
@@ -247,7 +266,7 @@ public class TestReadWrite {
     assertEquals(4.1, nextRecord.get("mydouble"));
     assertEquals(ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)), nextRecord.get("mybytes"));
     assertEquals("hello", nextRecord.get("mystring"));
-    assertEquals("a", nextRecord.get("myenum"));
+    assertEquals(expectedEnumSymbol, nextRecord.get("myenum"));
     assertEquals(nestedRecord, nextRecord.get("mynestedrecord"));
     assertEquals(integerArray, nextRecord.get("myarray"));
     assertEquals(emptyArray, nextRecord.get("myemptyarray"));
@@ -437,7 +456,7 @@ public class TestReadWrite {
     GenericFixed genericFixed = new GenericData.Fixed(
         Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 });
 
-    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(file);
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf,
file);
     GenericRecord nextRecord = reader.read();
     assertNotNull(nextRecord);
     assertEquals(true, nextRecord.get("myboolean"));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java
new file mode 100644
index 0000000..3e1d32e
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java
@@ -0,0 +1,495 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.Union;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.filter.ColumnPredicates;
+import org.apache.parquet.filter.ColumnRecordFilter;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReflectInputOutputFormat {
+  private static final Log LOG = Log.getLog(TestReflectInputOutputFormat.class);
+
+
+  public static class Service {
+    private long date;
+    private String mechanic;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Service service = (Service) o;
+
+      if (date != service.date) return false;
+      if (!mechanic.equals(service.mechanic)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (date ^ (date >>> 32));
+      result = 31 * result + mechanic.hashCode();
+      return result;
+    }
+  }
+
+  public static enum EngineType {
+    DIESEL, PETROL, ELECTRIC
+  }
+
+  public static class Engine {
+    private EngineType type;
+    private float capacity;
+    private boolean hasTurboCharger;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Engine engine = (Engine) o;
+
+      if (Float.compare(engine.capacity, capacity) != 0) return false;
+      if (hasTurboCharger != engine.hasTurboCharger) return false;
+      if (type != engine.type) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = type.hashCode();
+      result = 31 * result + (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0);
+      result = 31 * result + (hasTurboCharger ? 1 : 0);
+      return result;
+    }
+  }
+
+  public static class Stereo extends Extra {
+    private String make;
+    private int speakers;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Stereo stereo = (Stereo) o;
+
+      if (speakers != stereo.speakers) return false;
+      if (!make.equals(stereo.make)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = make.hashCode();
+      result = 31 * result + speakers;
+      return result;
+    }
+  }
+
+  public static class LeatherTrim extends Extra {
+    private String colour;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      LeatherTrim that = (LeatherTrim) o;
+
+      if (!colour.equals(that.colour)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return colour.hashCode();
+    }
+  }
+
+  @Union({Void.class, Stereo.class, LeatherTrim.class})
+  public static class Extra {}
+
+  public static class Car {
+    private long year;
+    private String registration;
+    private String make;
+    private String model;
+    private byte[] vin;
+    private int doors;
+    private Engine engine;
+    private Extra optionalExtra = null;
+    @Nullable
+    private List<Service> serviceHistory = null;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Car car = (Car) o;
+
+      if (doors != car.doors) return false;
+      if (year != car.year) return false;
+      if (!engine.equals(car.engine)) return false;
+      if (!make.equals(car.make)) return false;
+      if (!model.equals(car.model)) return false;
+      if (optionalExtra != null ? !optionalExtra.equals(car.optionalExtra) : car.optionalExtra
!= null)
+        return false;
+      if (!registration.equals(car.registration)) return false;
+      if (serviceHistory != null ? !serviceHistory.equals(car.serviceHistory) : car.serviceHistory
!= null)
+        return false;
+      if (!Arrays.equals(vin, car.vin)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (year ^ (year >>> 32));
+      result = 31 * result + registration.hashCode();
+      result = 31 * result + make.hashCode();
+      result = 31 * result + model.hashCode();
+      result = 31 * result + Arrays.hashCode(vin);
+      result = 31 * result + doors;
+      result = 31 * result + engine.hashCode();
+      result = 31 * result + (optionalExtra != null ? optionalExtra.hashCode() : 0);
+      result = 31 * result + (serviceHistory != null ? serviceHistory.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static class ShortCar {
+    @Nullable
+    private String make = null;
+    private Engine engine;
+    private long year;
+    private byte[] vin;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ShortCar shortCar = (ShortCar) o;
+
+      if (year != shortCar.year) return false;
+      if (!engine.equals(shortCar.engine)) return false;
+      if (make != null ? !make.equals(shortCar.make) : shortCar.make != null)
+        return false;
+      if (!Arrays.equals(vin, shortCar.vin)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = make != null ? make.hashCode() : 0;
+      result = 31 * result + engine.hashCode();
+      result = 31 * result + (int) (year ^ (year >>> 32));
+      result = 31 * result + Arrays.hashCode(vin);
+      return result;
+    }
+  }
+
+  public static final Schema CAR_SCHEMA = ReflectData.get()//AllowNulls.INSTANCE
+      .getSchema(Car.class);
+  public static final Schema SHORT_CAR_SCHEMA = ReflectData.get()//AllowNulls.INSTANCE
+      .getSchema(ShortCar.class);
+
+  public static Car nextRecord(int i) {
+    Car car = new Car();
+    car.doors = 2;
+    car.make = "Tesla";
+    car.model = String.format("Model X v%d", i % 2);
+    car.vin = String.format("1VXBR12EXCP%06d", i).getBytes();
+    car.year = 2014 + i;
+    car.registration = "California";
+
+    LeatherTrim trim = new LeatherTrim();
+    trim.colour = "black";
+    car.optionalExtra = trim;
+
+    Engine engine = new Engine();
+    engine.capacity = 85.0f;
+    engine.type = (i % 2) == 0 ? EngineType.ELECTRIC : EngineType.PETROL;
+    engine.hasTurboCharger = false;
+    car.engine = engine;
+
+    if (i % 4 == 0) {
+      Service service = new Service();
+      service.date = 1374084640;
+      service.mechanic = "Elon Musk";
+      car.serviceHistory = Lists.newArrayList();
+      car.serviceHistory.add(service);
+    }
+
+    return car;
+  }
+
+  public static class MyMapper extends Mapper<LongWritable, Text, Void, Car> {
+    @Override
+    public void run(Context context) throws IOException ,InterruptedException {
+      for (int i = 0; i < 10; i++) {
+        context.write(null, nextRecord(i));
+      }
+    }
+  }
+
+  public static class MyMapper2 extends Mapper<Void, Car, Void, Car> {
+    @Override
+    protected void map(Void key, Car car, Context context) throws IOException ,InterruptedException
{
+      // Note: Car can be null because of predicate pushdown defined by an UnboundedRecordFilter
(see below)
+      if (car != null) {
+        context.write(null, car);
+      }
+    }
+
+  }
+
+  public static class MyMapperShort extends
+      Mapper<Void, ShortCar, Void, ShortCar> {
+    @Override
+    protected void map(Void key, ShortCar car, Context context)
+        throws IOException, InterruptedException {
+      // Note: Car can be null because of predicate pushdown defined by an
+      // UnboundedRecordFilter (see below)
+      if (car != null) {
+        context.write(null, car);
+      }
+    }
+
+  }
+
+  public static class ElectricCarFilter implements UnboundRecordFilter {
+    private final UnboundRecordFilter filter;
+
+    public ElectricCarFilter() {
+      filter = ColumnRecordFilter.column("engine.type", ColumnPredicates.equalTo(org.apache.parquet.avro.EngineType.ELECTRIC));
+    }
+
+    @Override
+    public RecordFilter bind(Iterable<ColumnReader> readers) {
+      return filter.bind(readers);
+    }
+  }
+
+  final Configuration conf = new Configuration();
+  final Path inputPath = new Path("src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java");
+  final Path parquetPath = new Path("target/test/hadoop/TestReflectInputOutputFormat/parquet");
+  final Path outputPath = new Path("target/test/hadoop/TestReflectInputOutputFormat/out");
+
+  @Before
+  public void createParquetFile() throws Exception {
+    // set up readers and writers not in MR
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
+    AvroWriteSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
+
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      final Job job = new Job(conf, "write");
+
+      // input not really used
+      TextInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(TextInputFormat.class);
+
+      job.setMapperClass(TestReflectInputOutputFormat.MyMapper.class);
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(AvroParquetOutputFormat.class);
+      AvroParquetOutputFormat.setOutputPath(job, parquetPath);
+      AvroParquetOutputFormat.setSchema(job, CAR_SCHEMA);
+      AvroParquetOutputFormat.setAvroDataSupplier(job, ReflectDataSupplier.class);
+
+      waitForJob(job);
+    }
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    final Job job = new Job(conf, "read");
+    job.setInputFormatClass(AvroParquetInputFormat.class);
+    AvroParquetInputFormat.setInputPaths(job, parquetPath);
+    // Test push-down predicates by using an electric car filter
+    AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
+
+    // Test schema projection by dropping the optional extras
+    Schema projection = Schema.createRecord(CAR_SCHEMA.getName(),
+        CAR_SCHEMA.getDoc(), CAR_SCHEMA.getNamespace(), false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (Schema.Field field : ReflectData.get().getSchema(Car.class).getFields()) {
+      if (!"optionalExtra".equals(field.name())) {
+        fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
+            field.defaultValue(), field.order()));
+      }
+    }
+    projection.setFields(fields);
+    AvroParquetInputFormat.setRequestedProjection(job, projection);
+
+    job.setMapperClass(TestReflectInputOutputFormat.MyMapper2.class);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AvroParquetOutputFormat.class);
+    AvroParquetOutputFormat.setOutputPath(job, outputPath);
+    AvroParquetOutputFormat.setSchema(job, CAR_SCHEMA);
+
+    waitForJob(job);
+
+    final Path mapperOutput = new Path(outputPath.toString(),
+        "part-m-00000.parquet");
+    final AvroParquetReader<Car> out = new AvroParquetReader<Car>(conf, mapperOutput);
+    Car car;
+    Car previousCar = null;
+    int lineNumber = 0;
+    while ((car = out.read()) != null) {
+      if (previousCar != null) {
+         // Testing reference equality here. The "model" field should be dictionary-encoded.
+         assertTrue(car.model == previousCar.model);
+      }
+      // Make sure that predicate push down worked as expected
+      if (car.engine.type == EngineType.PETROL) {
+        fail("UnboundRecordFilter failed to remove cars with PETROL engines");
+      }
+      // Note we use lineNumber * 2 because of predicate push down
+      Car expectedCar = nextRecord(lineNumber * 2);
+      // We removed the optional extra field using projection so we shouldn't
+      // see it here...
+      expectedCar.optionalExtra = null;
+      assertEquals("line " + lineNumber, expectedCar, car);
+      ++lineNumber;
+      previousCar = car;
+    }
+    out.close();
+  }
+
+  @Test
+  public void testReadWriteChangedCar() throws Exception {
+
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    final Job job = new Job(conf, "read changed/short");
+    job.setInputFormatClass(AvroParquetInputFormat.class);
+    AvroParquetInputFormat.setInputPaths(job, parquetPath);
+    // Test push-down predicates by using an electric car filter
+    AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
+
+    // Test schema projection by dropping the engine, year, and vin (like ShortCar),
+    // but making make optional (unlike ShortCar)
+    Schema projection = Schema.createRecord(CAR_SCHEMA.getName(),
+        CAR_SCHEMA.getDoc(), CAR_SCHEMA.getNamespace(), false);
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (Schema.Field field : CAR_SCHEMA.getFields()) {
+      // No make!
+      if ("engine".equals(field.name()) || "year".equals(field.name()) || "vin".equals(field.name()))
{
+        fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
+            field.defaultValue(), field.order()));
+      }
+    }
+    projection.setFields(fields);
+    AvroParquetInputFormat.setRequestedProjection(job, projection);
+    AvroParquetInputFormat.setAvroReadSchema(job, SHORT_CAR_SCHEMA);
+
+    job.setMapperClass(TestReflectInputOutputFormat.MyMapperShort.class);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AvroParquetOutputFormat.class);
+    AvroParquetOutputFormat.setOutputPath(job, outputPath);
+    AvroParquetOutputFormat.setSchema(job, SHORT_CAR_SCHEMA);
+
+    waitForJob(job);
+
+    final Path mapperOutput = new Path(outputPath.toString(), "part-m-00000.parquet");
+    final AvroParquetReader<ShortCar> out = new AvroParquetReader<ShortCar>(conf,
mapperOutput);
+    ShortCar car;
+    int lineNumber = 0;
+    while ((car = out.read()) != null) {
+      // Make sure that predicate push down worked as expected
+      // Note we use lineNumber * 2 because of predicate push down
+      Car expectedCar = nextRecord(lineNumber * 2);
+      // We removed the optional extra field using projection so we shouldn't see it here...
+      assertNull(car.make);
+      assertEquals(car.engine, expectedCar.engine);
+      assertEquals(car.year, expectedCar.year);
+      assertArrayEquals(car.vin, expectedCar.vin);
+      ++lineNumber;
+    }
+    out.close();
+  }
+
+  private void waitForJob(Job job) throws Exception {
+    job.submit();
+    while (!job.isComplete()) {
+      LOG.debug("waiting for job " + job.getJobName());
+      sleep(100);
+    }
+    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS"
: "FAILURE"));
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+
+  @After
+  public void deleteOutputFile() throws IOException {
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
new file mode 100644
index 0000000..dffaf57
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestReflectReadWrite {
+
+  @Test
+  public void testReadWriteReflect() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
+
+    Path path = writePojosToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
+    ParquetReader<Pojo> reader = new AvroParquetReader<Pojo>(conf, path);
+    Pojo object = getPojo();
+    for (int i = 0; i < 10; i++) {
+      assertEquals(object, reader.read());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testWriteReflectReadGeneric() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, GenericDataSupplier.class);
+
+    Path path = writePojosToParquetFile(2, CompressionCodecName.UNCOMPRESSED, false);
+    ParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(conf,
path);
+    GenericRecord object = getGenericPojo();
+    for (int i = 0; i < 2; i += 1) {
+      assertEquals(object, reader.read());
+    }
+    assertNull(reader.read());
+  }
+
+  private GenericRecord getGenericPojo() {
+    Schema schema = ReflectData.get().getSchema(Pojo.class);
+    GenericData.Record record = new GenericData.Record(schema);
+    record.put("myboolean", true);
+    record.put("mybyte", 1);
+    record.put("myshort", 1);
+    record.put("myint", 1);
+    record.put("mylong", 2L);
+    record.put("myfloat", 3.1f);
+    record.put("mydouble", 4.1);
+    record.put("mybytes", ByteBuffer.wrap(new byte[] { 1, 2, 3, 4 }));
+    record.put("mystring", "Hello");
+    record.put("myenum", new GenericData.EnumSymbol(
+        schema.getField("myenum").schema(), "A"));
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "1");
+    map.put("b", "2");
+    record.put("mymap", map);
+    record.put("myshortarray", new GenericData.Array<Integer>(
+        schema.getField("myshortarray").schema(), Lists.newArrayList(1, 2)));
+    record.put("myintarray", new GenericData.Array<Integer>(
+        schema.getField("myintarray").schema(), Lists.newArrayList(1, 2)));
+    record.put("mystringarray", new GenericData.Array<String>(
+        schema.getField("mystringarray").schema(), Lists.newArrayList("a", "b")));
+    record.put("mylist", new GenericData.Array<String>(
+        schema.getField("mylist").schema(), Lists.newArrayList("a", "b", "c")));
+    return record;
+  }
+
+  private Pojo getPojo() {
+    Pojo object = new Pojo();
+    object.myboolean = true;
+    object.mybyte = 1;
+    object.myshort = 1;
+    object.myint = 1;
+    object.mylong = 2L;
+    object.myfloat = 3.1f;
+    object.mydouble = 4.1;
+    object.mybytes = new byte[] { 1, 2, 3, 4 };
+    object.mystring = "Hello";
+    object.myenum = E.A;
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "1");
+    map.put("b", "2");
+    object.mymap = map;
+    object.myshortarray = new short[] { 1, 2 };
+    object.myintarray = new int[] { 1, 2 };
+    object.mystringarray = new String[] { "a", "b" };
+    object.mylist = Lists.newArrayList("a", "b", "c");
+    return object;
+  }
+
+  private Path writePojosToParquetFile( int num, CompressionCodecName compression,
+                                        boolean enableDictionary) throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path path = new Path(tmp.getPath());
+
+    Pojo object = getPojo();
+
+    Schema schema = ReflectData.get().getSchema(object.getClass());
+    ParquetWriter<Pojo> writer = AvroParquetWriter.<Pojo>builder(path)
+        .withSchema(schema)
+        .withCompressionCodec(compression)
+        .withDataModel(ReflectData.get())
+        .withDictionaryEncoding(enableDictionary)
+        .build();
+    for (int i = 0; i < num; i++) {
+      writer.write(object);
+    }
+    writer.close();
+    return path;
+  }
+
+  public static enum E {
+    A, B
+  }
+
+  public static class Pojo {
+    public boolean myboolean;
+    public byte mybyte;
+    public short myshort;
+    // no char until https://issues.apache.org/jira/browse/AVRO-1458 is fixed
+    public int myint;
+    public long mylong;
+    public float myfloat;
+    public double mydouble;
+    public byte[] mybytes;
+    public String mystring;
+    public E myenum;
+    private Map<String, String> mymap;
+    private short[] myshortarray;
+    private int[] myintarray;
+    private String[] mystringarray;
+    private List<String> mylist;
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Pojo)) return false;
+      Pojo that = (Pojo) o;
+      return myboolean == that.myboolean
+          && mybyte == that.mybyte
+          && myshort == that.myshort
+          && myint == that.myint
+          && mylong == that.mylong
+          && myfloat == that.myfloat
+          && mydouble == that.mydouble
+          && Arrays.equals(mybytes, that.mybytes)
+          && mystring.equals(that.mystring)
+          && myenum == that.myenum
+          && mymap.equals(that.mymap)
+          && Arrays.equals(myshortarray, that.myshortarray)
+          && Arrays.equals(myintarray, that.myintarray)
+          && Arrays.equals(mystringarray, that.mystringarray)
+          && mylist.equals(that.mylist);
+    }
+
+    @Override
+    public String toString() {
+      return "Pojo{" +
+          "myboolean=" + myboolean +
+          ", mybyte=" + mybyte +
+          ", myshort=" + myshort +
+          ", myint=" + myint +
+          ", mylong=" + mylong +
+          ", myfloat=" + myfloat +
+          ", mydouble=" + mydouble +
+          ", mybytes=" + Arrays.toString(mybytes) +
+          ", mystring='" + mystring + '\'' +
+          ", myenum=" + myenum +
+          ", mymap=" + mymap +
+          ", myshortarray=" + Arrays.toString(myshortarray) +
+          ", myintarray=" + Arrays.toString(myintarray) +
+          ", mystringarray=" + Arrays.toString(mystringarray) +
+          ", mylist=" + mylist +
+          '}';
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index f01f009..61ab3e3 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -29,6 +29,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -37,17 +39,34 @@ import org.junit.Test;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Other tests exercise the use of Avro Generic, a dynamic data representation. This class
focuses
  * on Avro Speific whose schemas are pre-compiled to POJOs with built in SerDe for faster
serialization.
  */
+@RunWith(Parameterized.class)
 public class TestSpecificReadWrite {
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { false },  // use the new converters
+        { true } }; // use the old converters
+    return Arrays.asList(data);
+  }
+
+  private final Configuration testConf = new Configuration(false);
+
+  public TestSpecificReadWrite(boolean compat) {
+    this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
+  }
+
   @Test
-  public void testReadWriteSpecific() throws IOException {
+  public void testCompatReadWriteSpecific() throws IOException {
     Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path);
     for (int i = 0; i < 10; i++) {
       assertEquals(getVwPolo().toString(), reader.read().toString());
       assertEquals(getVwPassat().toString(), reader.read().toString());
@@ -59,7 +78,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testReadWriteSpecificWithDictionary() throws IOException {
     Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, true);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path);
     for (int i = 0; i < 10; i++) {
       assertEquals(getVwPolo().toString(), reader.read().toString());
       assertEquals(getVwPassat().toString(), reader.read().toString());
@@ -71,7 +90,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testFilterMatchesMultiple() throws IOException {
     Path path = writeCarsToParquetFile(10, CompressionCodecName.UNCOMPRESSED, false);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
equalTo("Volkswagen")));
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("make",
equalTo("Volkswagen")));
     for (int i = 0; i < 10; i++) {
       assertEquals(getVwPolo().toString(), reader.read().toString());
       assertEquals(getVwPassat().toString(), reader.read().toString());
@@ -82,7 +101,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testFilterMatchesMultipleBlocks() throws IOException {
     Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64,
DEFAULT_PAGE_SIZE/64);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
equalTo("Volkswagen")));
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("make",
equalTo("Volkswagen")));
     for (int i = 0; i < 10000; i++) {
       assertEquals(getVwPolo().toString(), reader.read().toString());
       assertEquals(getVwPassat().toString(), reader.read().toString());
@@ -93,7 +112,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testFilterMatchesNoBlocks() throws IOException {
     Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64,
DEFAULT_PAGE_SIZE/64);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
equalTo("Bogus")));
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("make",
equalTo("Bogus")));
     assertNull(reader.read());
   }
 
@@ -119,7 +138,7 @@ public class TestSpecificReadWrite {
     writer.write(bmwMini); // only write BMW in last block
     writer.close();
 
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("make",
         equalTo("BMW")));
     assertEquals(getBmwMini().toString(), reader.read().toString());
     assertNull(reader.read());
@@ -128,7 +147,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testFilterWithDictionary() throws IOException {
     Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true);
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
equalTo("Volkswagen")));
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("make",
equalTo("Volkswagen")));
     assertEquals(getVwPolo().toString(), reader.read().toString());
     assertEquals(getVwPassat().toString(), reader.read().toString());
     assertNull(reader.read());
@@ -138,15 +157,15 @@ public class TestSpecificReadWrite {
   public void testFilterOnSubAttribute() throws IOException {
     Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
     
-    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("engine.type",
equalTo(EngineType.DIESEL)));
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(testConf, path, column("engine.type",
equalTo(EngineType.DIESEL)));
     assertEquals(reader.read().toString(), getVwPassat().toString());
     assertNull(reader.read());
 
-    reader = new AvroParquetReader<Car>(path, column("engine.capacity", equalTo(1.4f)));
+    reader = new AvroParquetReader<Car>(testConf, path, column("engine.capacity", equalTo(1.4f)));
     assertEquals(getVwPolo().toString(), reader.read().toString());
     assertNull(reader.read());
 
-    reader = new AvroParquetReader<Car>(path, column("engine.hasTurboCharger", equalTo(true)));
+    reader = new AvroParquetReader<Car>(testConf, path, column("engine.hasTurboCharger",
equalTo(true)));
     assertEquals(getBmwMini().toString(), reader.read().toString());
     assertNull(reader.read());
   }
@@ -154,7 +173,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testProjection() throws IOException {
     Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(testConf);
 
     Schema schema = Car.getClassSchema();
     List<Schema.Field> fields = schema.getFields();
@@ -193,7 +212,7 @@ public class TestSpecificReadWrite {
   @Test
   public void testAvroReadSchema() throws IOException {
     Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(testConf);
     AvroReadSupport.setAvroReadSchema(conf, NewCar.SCHEMA$);
 
     ParquetReader<NewCar> reader = new AvroParquetReader<NewCar>(conf, path);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index b9dea05..b806d3d 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -62,7 +62,7 @@
     <dependency>
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
-      <version>6.5.7</version>
+      <version>${fastutil.version}</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/a458e1a2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7b46e8a..dfe2cd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
     <pig.version>0.11.1</pig.version>
     <pig.classifier />
     <thrift.version>0.7.0</thrift.version>
+    <fastutil.version>6.5.7</fastutil.version>
   </properties>
 
   <modules>


Mime
View raw message