parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject [1/4] Add a unified and optionally more constrained API for expressing filters on columns
Date Tue, 29 Jul 2014 21:39:15 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master fc2c29df7 -> ad32bf0fd


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
new file mode 100644
index 0000000..a688ef8
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
@@ -0,0 +1,84 @@
+package parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.column.statistics.IntStatistics;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.hadoop.TestInputFormat.makeBlockFromStats;
+
+public class TestRowGroupFilter {
+  @Test
+  public void testApplyRowGroupFilters() {
+
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+    IntStatistics stats1 = new IntStatistics();
+    stats1.setMinMax(10, 100);
+    stats1.setNumNulls(4);
+    BlockMetaData b1 = makeBlockFromStats(stats1, 301);
+    blocks.add(b1);
+
+    IntStatistics stats2 = new IntStatistics();
+    stats2.setMinMax(8, 102);
+    stats2.setNumNulls(0);
+    BlockMetaData b2 = makeBlockFromStats(stats2, 302);
+    blocks.add(b2);
+
+    IntStatistics stats3 = new IntStatistics();
+    stats3.setMinMax(100, 102);
+    stats3.setNumNulls(12);
+    BlockMetaData b3 = makeBlockFromStats(stats3, 303);
+    blocks.add(b3);
+
+
+    IntStatistics stats4 = new IntStatistics();
+    stats4.setMinMax(0, 0);
+    stats4.setNumNulls(304);
+    BlockMetaData b4 = makeBlockFromStats(stats4, 304);
+    blocks.add(b4);
+
+
+    IntStatistics stats5 = new IntStatistics();
+    stats5.setMinMax(50, 50);
+    stats5.setNumNulls(7);
+    BlockMetaData b5 = makeBlockFromStats(stats5, 305);
+    blocks.add(b5);
+
+    IntStatistics stats6 = new IntStatistics();
+    stats6.setMinMax(0, 0);
+    stats6.setNumNulls(12);
+    BlockMetaData b6 = makeBlockFromStats(stats6, 306);
+    blocks.add(b6);
+
+    MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
+    IntColumn foo = intColumn("foo");
+
+    List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b5), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), blocks, schema);
+    assertEquals(Arrays.asList(b1, b2, b3, b5, b6), filtered);
+
+    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), blocks, schema);
+    assertEquals(Arrays.asList(b6), filtered);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
new file mode 100644
index 0000000..99f0a4e
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -0,0 +1,251 @@
+package parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroup;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.hadoop.ParquetReader;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.example.GroupReadSupport;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+public class PhoneBookWriter {
+  private static final String schemaString =
+      "message user {\n"
+          + "  required int64 id;\n"
+          + "  optional binary name (UTF8);\n"
+          + "  optional group location {\n"
+          + "    optional double lon;\n"
+          + "    optional double lat;\n"
+          + "  }\n"
+          + "  optional group phoneNumbers {\n"
+          + "    repeated group phone {\n"
+          + "      required int64 number;\n"
+          + "      optional binary kind (UTF8);\n"
+          + "    }\n"
+          + "  }\n"
+          + "}\n";
+
+  private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+  public static class Location {
+    private final Double lon;
+    private final Double lat;
+
+    public Location(Double lon, Double lat) {
+      this.lon = lon;
+      this.lat = lat;
+    }
+
+    public Double getLon() {
+      return lon;
+    }
+
+    public Double getLat() {
+      return lat;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Location location = (Location) o;
+
+      if (lat != null ? !lat.equals(location.lat) : location.lat != null) return false;
+      if (lon != null ? !lon.equals(location.lon) : location.lon != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = lon != null ? lon.hashCode() : 0;
+      result = 31 * result + (lat != null ? lat.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static class PhoneNumber {
+    private final long number;
+    private final String kind;
+
+    public PhoneNumber(long number, String kind) {
+      this.number = number;
+      this.kind = kind;
+    }
+
+    public long getNumber() {
+      return number;
+    }
+
+    public String getKind() {
+      return kind;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      PhoneNumber that = (PhoneNumber) o;
+
+      if (number != that.number) return false;
+      if (kind != null ? !kind.equals(that.kind) : that.kind != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (number ^ (number >>> 32));
+      result = 31 * result + (kind != null ? kind.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static class User {
+    private final long id;
+    private final String name;
+    private final List<PhoneNumber> phoneNumbers;
+    private final Location location;
+
+    public User(long id, String name, List<PhoneNumber> phoneNumbers, Location location) {
+      this.id = id;
+      this.name = name;
+      this.phoneNumbers = phoneNumbers;
+      this.location = location;
+    }
+
+    public long getId() {
+      return id;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public List<PhoneNumber> getPhoneNumbers() {
+      return phoneNumbers;
+    }
+
+    public Location getLocation() {
+      return location;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      User user = (User) o;
+
+      if (id != user.id) return false;
+      if (location != null ? !location.equals(user.location) : user.location != null) return false;
+      if (name != null ? !name.equals(user.name) : user.name != null) return false;
+      if (phoneNumbers != null ? !phoneNumbers.equals(user.phoneNumbers) : user.phoneNumbers != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (id ^ (id >>> 32));
+      result = 31 * result + (name != null ? name.hashCode() : 0);
+      result = 31 * result + (phoneNumbers != null ? phoneNumbers.hashCode() : 0);
+      result = 31 * result + (location != null ? location.hashCode() : 0);
+      return result;
+    }
+  }
+
+  public static SimpleGroup groupFromUser(User user) {
+    SimpleGroup root = new SimpleGroup(schema);
+    root.append("id", user.getId());
+
+    if (user.getName() != null) {
+      root.append("name", user.getName());
+    }
+
+    if (user.getPhoneNumbers() != null) {
+      Group phoneNumbers = root.addGroup("phoneNumbers");
+      for (PhoneNumber number : user.getPhoneNumbers()) {
+        Group phone = phoneNumbers.addGroup("phone");
+        phone.append("number", number.getNumber());
+        if (number.getKind() != null) {
+          phone.append("kind", number.getKind());
+        }
+      }
+    }
+
+    if (user.getLocation() != null) {
+      Group location = root.addGroup("location");
+      if (user.getLocation().getLon() != null) {
+        location.append("lon", user.getLocation().getLon());
+      }
+      if (user.getLocation().getLat() != null) {
+        location.append("lat", user.getLocation().getLat());
+      }
+    }
+    return root;
+  }
+
+  public static File writeToFile(List<User> users) throws IOException {
+    File f = File.createTempFile("phonebook", ".parquet");
+    f.deleteOnExit();
+    if (!f.delete()) {
+      throw new IOException("couldn't delete tmp file" + f);
+    }
+
+    writeToFile(f, users);
+
+    return f;
+  }
+
+  public static void writeToFile(File f, List<User> users) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
+    for (User u : users) {
+      writer.write(groupFromUser(u));
+    }
+    writer.close();
+  }
+
+  public static List<Group> readFile(File f, Filter filter) throws IOException {
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    ParquetReader<Group> reader =
+        ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
+                     .withConf(conf)
+                     .withFilter(filter)
+                     .build();
+
+    Group current;
+    List<Group> users = new ArrayList<Group>();
+
+    current = reader.read();
+    while (current != null) {
+      users.add(current);
+      current = reader.read();
+    }
+
+    return users;
+  }
+
+  public static void main(String[] args) throws IOException {
+    File f = new File(args[0]);
+    writeToFile(f, TestRecordLevelFilters.makeUsers());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
new file mode 100644
index 0000000..d771ead
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
@@ -0,0 +1,205 @@
+package parquet.filter2.recordlevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import parquet.example.data.Group;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Statistics;
+import parquet.filter2.predicate.UserDefinedPredicate;
+import parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import parquet.filter2.recordlevel.PhoneBookWriter.User;
+import parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+
+public class TestRecordLevelFilters {
+
+  public static List<User> makeUsers() {
+    List<User> users = new ArrayList<User>();
+
+    users.add(new User(17, null, null, null));
+
+    users.add(new User(18, "bob", null, null));
+
+    users.add(new User(19, "alice", new ArrayList<PhoneNumber>(), null));
+
+    users.add(new User(20, "thing1", Arrays.asList(new PhoneNumber(5555555555L, null)), null));
+
+    users.add(new User(27, "thing2", Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+    users.add(new User(28, "popular", Arrays.asList(
+        new PhoneNumber(1111111111L, "home"),
+        new PhoneNumber(2222222222L, null),
+        new PhoneNumber(3333333333L, "mobile")
+    ), null));
+
+    users.add(new User(30, null, Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
+
+    for (int i = 100; i < 200; i++) {
+      Location location = null;
+      if (i % 3 == 1) {
+        location = new Location((double)i, (double)i*2);
+      }
+      if (i % 3 == 2) {
+        location = new Location((double)i, null);
+      }
+      users.add(new User(i, "p" + i, Arrays.asList(new PhoneNumber(i, "cell")), location));
+    }
+
+    return users;
+  }
+
+  private static File phonebookFile;
+  private static List<User> users;
+
+  @BeforeClass
+  public static void setup() throws IOException{
+    users = makeUsers();
+    phonebookFile = PhoneBookWriter.writeToFile(users);
+  }
+
+  private static interface UserFilter {
+    boolean keep(User u);
+  }
+
+  private static List<Group> getExpected(UserFilter f) {
+    List<Group> expected = new ArrayList<Group>();
+    for (User u : users) {
+      if (f.keep(u)) {
+        expected.add(PhoneBookWriter.groupFromUser(u));
+      }
+    }
+    return expected;
+  }
+
+  private static void assertFilter(List<Group> found, UserFilter f) {
+    List<Group> expected = getExpected(f);
+    assertEquals(expected.size(), found.size());
+    Iterator<Group> expectedIter = expected.iterator();
+    Iterator<Group> foundIter = found.iterator();
+    while(expectedIter.hasNext()) {
+      assertEquals(expectedIter.next().toString(), foundIter.next().toString());
+    }
+  }
+
+  @Test
+  public void testNoFilter() throws Exception {
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.NOOP);
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return true;
+      }
+    });
+  }
+
+  @Test
+  public void testAllFilter() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = eq(name, Binary.fromString("no matches"));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+    assertEquals(new ArrayList<Group>(), found);
+  }
+
+  @Test
+  public void testNameNotNull() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = notEq(name, null);
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return u.getName() != null;
+      }
+    });
+  }
+
+  public static class StartWithP extends UserDefinedPredicate<Binary> {
+
+    @Override
+    public boolean keep(Binary value) {
+      if (value == null) {
+        return false;
+      }
+      return value.toStringUsingUTF8().startsWith("p");
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Binary> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Binary> statistics) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testNameNotStartWithP() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+
+    FilterPredicate pred = not(userDefined(name, StartWithP.class));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        return u.getName() == null || !u.getName().startsWith("p");
+      }
+    });
+  }
+
+  @Test
+  public void testComplex() throws Exception {
+    BinaryColumn name = binaryColumn("name");
+    DoubleColumn lon = doubleColumn("location.lon");
+    DoubleColumn lat = doubleColumn("location.lat");
+
+    FilterPredicate pred = or(and(gt(lon, 150.0), notEq(lat, null)), eq(name, Binary.fromString("alice")));
+
+    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
+
+    assertFilter(found, new UserFilter() {
+      @Override
+      public boolean keep(User u) {
+        String name = u.getName();
+        Double lat = null;
+        Double lon = null;
+        if (u.getLocation() != null) {
+          lat = u.getLocation().getLat();
+          lon = u.getLocation().getLon();
+        }
+
+        return (lon != null && lon > 150.0 && lat != null) || "alice".equals(name);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
new file mode 100644
index 0000000..4e75b20
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -0,0 +1,307 @@
+package parquet.filter2.statisticslevel;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.column.Encoding;
+import parquet.column.statistics.DoubleStatistics;
+import parquet.column.statistics.IntStatistics;
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.LogicalInverseRewriter;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Statistics;
+import parquet.filter2.predicate.UserDefinedPredicate;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.gtEq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.lt;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.statisticslevel.StatisticsFilter.canDrop;
+
+public class TestStatisticsFilter {
+
+  private static ColumnChunkMetaData getIntColumnMeta(IntStatistics stats, long valueCount) {
+    return ColumnChunkMetaData.get(ColumnPath.get("int", "column"),
+        PrimitiveTypeName.INT32,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        0L, 0L, valueCount, 0L, 0L);
+  }
+
+  private static ColumnChunkMetaData getDoubleColumnMeta(DoubleStatistics stats, long valueCount) {
+    return ColumnChunkMetaData.get(ColumnPath.get("double", "column"),
+        PrimitiveTypeName.DOUBLE,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        0L, 0L, valueCount, 0L, 0L);
+  }
+
+  private static final IntColumn intColumn = intColumn("int.column");
+  private static final DoubleColumn doubleColumn = doubleColumn("double.column");
+
+  private static final IntStatistics intStats = new IntStatistics();
+  private static final IntStatistics nullIntStats = new IntStatistics();
+  private static final DoubleStatistics doubleStats = new DoubleStatistics();
+
+  static {
+    intStats.setMinMax(10, 100);
+    doubleStats.setMinMax(10, 100);
+
+    nullIntStats.setMinMax(0, 0);
+    nullIntStats.setNumNulls(177);
+  }
+
+  private static final List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+      getIntColumnMeta(intStats, 177L),
+      getDoubleColumnMeta(doubleStats, 177L));
+
+  private static final List<ColumnChunkMetaData> nullColumnMetas = Arrays.asList(
+      getIntColumnMeta(nullIntStats, 177L), // column of all nulls
+      getDoubleColumnMeta(doubleStats, 177L));
+
+
+  @Test
+  public void testEqNonNull() {
+    assertTrue(canDrop(eq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(eq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(eq(intColumn, 100), columnMetas));
+    assertTrue(canDrop(eq(intColumn, 101), columnMetas));
+
+    // drop columns of all nulls when looking for non-null value
+    assertTrue(canDrop(eq(intColumn, 0), nullColumnMetas));
+  }
+
+  @Test
+  public void testEqNull() {
+    IntStatistics statsNoNulls = new IntStatistics();
+    statsNoNulls.setMinMax(10, 100);
+    statsNoNulls.setNumNulls(0);
+
+    IntStatistics statsSomeNulls = new IntStatistics();
+    statsSomeNulls.setMinMax(10, 100);
+    statsSomeNulls.setNumNulls(3);
+
+    assertTrue(canDrop(eq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsNoNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(eq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsSomeNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+  }
+
+  @Test
+  public void testNotEqNonNull() {
+    assertFalse(canDrop(notEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 100), columnMetas));
+    assertFalse(canDrop(notEq(intColumn, 101), columnMetas));
+
+    IntStatistics allSevens = new IntStatistics();
+    allSevens.setMinMax(7, 7);
+    assertTrue(canDrop(notEq(intColumn, 7), Arrays.asList(
+        getIntColumnMeta(allSevens, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+  }
+
+  @Test
+  public void testNotEqNull() {
+    IntStatistics statsNoNulls = new IntStatistics();
+    statsNoNulls.setMinMax(10, 100);
+    statsNoNulls.setNumNulls(0);
+
+    IntStatistics statsSomeNulls = new IntStatistics();
+    statsSomeNulls.setMinMax(10, 100);
+    statsSomeNulls.setNumNulls(3);
+
+    IntStatistics statsAllNulls = new IntStatistics();
+    statsAllNulls.setMinMax(0, 0);
+    statsAllNulls.setNumNulls(177);
+
+    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsNoNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsSomeNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertTrue(canDrop(notEq(intColumn, null), Arrays.asList(
+        getIntColumnMeta(statsAllNulls, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+  }
+
+  @Test
+  public void testLt() {
+    assertTrue(canDrop(lt(intColumn, 9), columnMetas));
+    assertTrue(canDrop(lt(intColumn, 10), columnMetas));
+    assertFalse(canDrop(lt(intColumn, 100), columnMetas));
+    assertFalse(canDrop(lt(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(lt(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(lt(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testLtEq() {
+    assertTrue(canDrop(ltEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 100), columnMetas));
+    assertFalse(canDrop(ltEq(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(ltEq(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(ltEq(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testGt() {
+    assertFalse(canDrop(gt(intColumn, 9), columnMetas));
+    assertFalse(canDrop(gt(intColumn, 10), columnMetas));
+    assertTrue(canDrop(gt(intColumn, 100), columnMetas));
+    assertTrue(canDrop(gt(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(gt(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(gt(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testGtEq() {
+    assertFalse(canDrop(gtEq(intColumn, 9), columnMetas));
+    assertFalse(canDrop(gtEq(intColumn, 10), columnMetas));
+    assertFalse(canDrop(gtEq(intColumn, 100), columnMetas));
+    assertTrue(canDrop(gtEq(intColumn, 101), columnMetas));
+
+    assertTrue(canDrop(gtEq(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(gtEq(intColumn, 7), nullColumnMetas));
+  }
+
+  @Test
+  public void testAnd() {
+    FilterPredicate yes = eq(intColumn, 9);
+    FilterPredicate no = eq(doubleColumn, 50D);
+    assertTrue(canDrop(and(yes, yes), columnMetas));
+    assertFalse(canDrop(and(yes, no), columnMetas));
+    assertFalse(canDrop(and(no, yes), columnMetas));
+    assertFalse(canDrop(and(no, no), columnMetas));
+  }
+
+  @Test
+  public void testOr() {
+    FilterPredicate yes = eq(intColumn, 9);
+    FilterPredicate no = eq(doubleColumn, 50D);
+    assertTrue(canDrop(or(yes, yes), columnMetas));
+    assertFalse(canDrop(or(yes, no), columnMetas));
+    assertFalse(canDrop(or(no, yes), columnMetas));
+    assertFalse(canDrop(or(no, no), columnMetas));
+  }
+
+  public static class SevensAndEightsUdp extends UserDefinedPredicate<Integer> {
+
+    @Override
+    public boolean keep(Integer value) {
+      throw new RuntimeException("this method should not be called");
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Integer> statistics) {
+      return statistics.getMin() == 7 && statistics.getMax() == 7;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Integer> statistics) {
+      return statistics.getMin() == 8 && statistics.getMax() == 8;
+    }
+  }
+
+  @Test
+  public void testUdp() {
+    FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class);
+    FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class)));
+
+    IntStatistics seven = new IntStatistics();
+    seven.setMinMax(7, 7);
+
+    IntStatistics eight = new IntStatistics();
+    eight.setMinMax(8, 8);
+
+    IntStatistics neither = new IntStatistics();
+    neither.setMinMax(1 , 2);
+
+    assertTrue(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(seven, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(eight, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(pred, Arrays.asList(
+        getIntColumnMeta(neither, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(seven, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertTrue(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(eight, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(invPred, Arrays.asList(
+        getIntColumnMeta(neither, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+  }
+
+  @Test
+  public void testClearExceptionForNots() {
+    List<ColumnChunkMetaData> columnMetas = Arrays.asList(
+        getIntColumnMeta(new IntStatistics(), 0L),
+        getDoubleColumnMeta(new DoubleStatistics(), 0L));
+
+    FilterPredicate pred = and(eq(intColumn, 17), not(eq(doubleColumn, 12.0)));
+
+    try {
+      canDrop(pred, columnMetas);
+      fail("This should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter?"
+          + " not(eq(double.column, 12.0))", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testMissingColumn() {
+    List<ColumnChunkMetaData> columnMetas = Arrays.asList(getIntColumnMeta(new IntStatistics(), 0L));
+    try {
+      canDrop(and(eq(intColumn, 17), eq(doubleColumn, 12.0)), columnMetas);
+      fail("This should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Column double.column not found in schema!", e.getMessage());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index 1ab1cc5..1823716 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -15,18 +15,37 @@
  */
 package parquet.hadoop;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.junit.Before;
 import org.junit.Test;
+
+import parquet.column.ColumnReader;
 import parquet.column.Encoding;
 import parquet.column.statistics.BinaryStatistics;
+import parquet.column.statistics.IntStatistics;
+import parquet.common.schema.ColumnPath;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.Operators.IntColumn;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.FileMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -35,16 +54,17 @@ import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
 
 public class TestInputFormat {
 
@@ -93,6 +113,25 @@ public class TestInputFormat {
     }
   }
 
+  @Test
+  public void testGetFilter() throws IOException {
+    IntColumn intColumn = intColumn("foo");
+    FilterPredicate p = or(eq(intColumn, 7), eq(intColumn, 12));
+    Configuration conf = new Configuration();
+    ParquetInputFormat.setFilterPredicate(conf, p);
+    Filter read = ParquetInputFormat.getFilter(conf);
+    assertTrue(read instanceof FilterPredicateCompat);
+    assertEquals(p, ((FilterPredicateCompat) read).getFilterPredicate());
+
+    conf = new Configuration();
+    ParquetInputFormat.setFilterPredicate(conf, not(p));
+    read = ParquetInputFormat.getFilter(conf);
+    assertTrue(read instanceof FilterPredicateCompat);
+    assertEquals(and(notEq(intColumn, 7), notEq(intColumn, 12)), ((FilterPredicateCompat) read).getFilterPredicate());
+
+    assertEquals(FilterCompat.NOOP, ParquetInputFormat.getFilter(new Configuration()));
+  }
+
   /*
     aaaaa bbbbb
    */
@@ -246,6 +285,57 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
   }
 
+  public static final class DummyUnboundRecordFilter implements UnboundRecordFilter {
+    @Override
+    public RecordFilter bind(Iterable<ColumnReader> readers) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testOnlyOneKindOfFilterSupported() throws Exception {
+    IntColumn foo = intColumn("foo");
+    FilterPredicate p = or(eq(foo, 10), eq(foo, 11));
+
+    Job job = new Job();
+
+    Configuration conf = job.getConfiguration();
+    ParquetInputFormat.setUnboundRecordFilter(job, DummyUnboundRecordFilter.class);
+    try {
+      ParquetInputFormat.setFilterPredicate(conf, p);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("You cannot provide a FilterPredicate after providing an UnboundRecordFilter", e.getMessage());
+    }
+
+    job = new Job();
+    conf = job.getConfiguration();
+
+    ParquetInputFormat.setFilterPredicate(conf, p);
+    try {
+      ParquetInputFormat.setUnboundRecordFilter(job, DummyUnboundRecordFilter.class);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("You cannot provide an UnboundRecordFilter after providing a FilterPredicate", e.getMessage());
+    }
+
+  }
+
+  public static BlockMetaData makeBlockFromStats(IntStatistics stats, long valueCount) {
+    BlockMetaData blockMetaData = new BlockMetaData();
+
+    ColumnChunkMetaData column = ColumnChunkMetaData.get(ColumnPath.get("foo"),
+        PrimitiveTypeName.INT32,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        100l, 100l, valueCount, 100l, 100l);
+    blockMetaData.addColumn(column);
+    blockMetaData.setTotalByteSize(200l);
+    blockMetaData.setRowCount(valueCount);
+    return blockMetaData;
+  }
+
   @Test
   public void testFooterCacheValueIsCurrent() throws IOException, InterruptedException {
     File tempFile = getTempFile();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/hadoop/metadata/TestColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/metadata/TestColumnChunkMetaData.java b/parquet-hadoop/src/test/java/parquet/hadoop/metadata/TestColumnChunkMetaData.java
index a449818..9ecb238 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/metadata/TestColumnChunkMetaData.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/metadata/TestColumnChunkMetaData.java
@@ -1,9 +1,5 @@
 package parquet.hadoop.metadata;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-
 import java.util.HashSet;
 import java.util.Set;
 
@@ -11,8 +7,13 @@ import org.junit.Test;
 
 import parquet.column.Encoding;
 import parquet.column.statistics.BinaryStatistics;
+import parquet.common.schema.ColumnPath;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
 public class TestColumnChunkMetaData {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/test/java/parquet/hadoop/util/TestSerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/util/TestSerializationUtil.java b/parquet-hadoop/src/test/java/parquet/hadoop/util/TestSerializationUtil.java
new file mode 100644
index 0000000..9b305db
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/util/TestSerializationUtil.java
@@ -0,0 +1,53 @@
+package parquet.hadoop.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/test/java/com/twitter/elephantbird/util/TestHadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public class TestSerializationUtil {
+
+  @Test
+  public void testReadWriteObjectToConfAsBase64() throws Exception {
+    Map<Integer, String> anObject = new HashMap<Integer, String>();
+    anObject.put(7, "seven");
+    anObject.put(8, "eight");
+
+    Configuration conf = new Configuration();
+
+    SerializationUtil.writeObjectToConfAsBase64("anobject", anObject, conf);
+    Map<Integer, String> copy = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+    assertEquals(anObject, copy);
+
+    try {
+      Set<String> bad = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+      fail("This should throw a ClassCastException");
+    } catch (ClassCastException e) {
+
+    }
+
+    conf = new Configuration();
+    Object nullObj = null;
+
+    SerializationUtil.writeObjectToConfAsBase64("anobject", null, conf);
+    Object copyObj = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+    assertEquals(nullObj, copyObj);
+  }
+
+  @Test
+  public void readObjectFromConfAsBase64UnsetKey() throws Exception {
+    assertNull(SerializationUtil.readObjectFromConfAsBase64("non-existant-key", new Configuration()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive-bundle/pom.xml b/parquet-hive-bundle/pom.xml
index 9b55437..6b6fda2 100644
--- a/parquet-hive-bundle/pom.xml
+++ b/parquet-hive-bundle/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
index 652661e..4b03f95 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive-binding</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
index 54abca7..b5ada25 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive-binding</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml
index 00966c9..420beb6 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml
@@ -17,7 +17,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive-binding</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
index 2c30d07..2749bbc 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive-binding</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
index d326f84..e417d13 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive-binding</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-binding/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-binding/pom.xml b/parquet-hive/parquet-hive-binding/pom.xml
index aa89505..a11ff69 100644
--- a/parquet-hive/parquet-hive-binding/pom.xml
+++ b/parquet-hive/parquet-hive-binding/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/parquet-hive-storage-handler/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/pom.xml b/parquet-hive/parquet-hive-storage-handler/pom.xml
index dfafd4a..01dbc4c 100644
--- a/parquet-hive/parquet-hive-storage-handler/pom.xml
+++ b/parquet-hive/parquet-hive-storage-handler/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet-hive</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hive/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hive/pom.xml b/parquet-hive/pom.xml
index d7866fb..cfb3480 100644
--- a/parquet-hive/pom.xml
+++ b/parquet-hive/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-jackson/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml
index 265d48e..acffda6 100644
--- a/parquet-jackson/pom.xml
+++ b/parquet-jackson/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-pig-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-pig-bundle/pom.xml b/parquet-pig-bundle/pom.xml
index 82231d5..ab5c11d 100644
--- a/parquet-pig-bundle/pom.xml
+++ b/parquet-pig-bundle/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-pig/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index ad30441..5caf926 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index e1e39dc..f46a35f 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java
index efd6b12..6ea9d55 100644
--- a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java
+++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java
@@ -15,25 +15,40 @@
  */
 package parquet.proto;
 
+import java.io.IOException;
+
 import com.google.protobuf.MessageOrBuilder;
+
 import org.apache.hadoop.fs.Path;
+
 import parquet.filter.UnboundRecordFilter;
 import parquet.hadoop.ParquetReader;
-import parquet.hadoop.api.ReadSupport;
-
-import java.io.IOException;
 
 /**
  * Read Protobuf records from a Parquet file.
  */
 public class ProtoParquetReader<T extends MessageOrBuilder> extends ParquetReader<T> {
 
+  @SuppressWarnings("unchecked")
+  public Builder<T> builder(Path file) {
+    return ParquetReader.builder(new ProtoReadSupport(), file);
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  @SuppressWarnings("unchecked")
   public ProtoParquetReader(Path file) throws IOException {
     super(file, new ProtoReadSupport());
   }
 
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  @SuppressWarnings("unchecked")
   public ProtoParquetReader(Path file, UnboundRecordFilter recordFilter) throws IOException {
     super(file, new ProtoReadSupport(), recordFilter);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-scala/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-scala/pom.xml b/parquet-scala/pom.xml
new file mode 100644
index 0000000..33663b5
--- /dev/null
+++ b/parquet-scala/pom.xml
@@ -0,0 +1,67 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>com.twitter</groupId>
+    <artifactId>parquet</artifactId>
+    <relativePath>../pom.xml</relativePath>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>parquet-scala</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Parquet Scala</name>
+  <url>https://github.com/Parquet/parquet-mr</url>
+
+  <repositories>
+    <repository>
+      <id>scala-tools.org</id>
+      <name>Scala-tools Maven2 Repository</name>
+      <url>http://scala-tools.org/repo-releases</url>
+    </repository>
+  </repositories>
+  <pluginRepositories>
+    <pluginRepository>
+      <id>scala-tools.org</id>
+      <name>Scala-tools Maven2 Repository</name>
+      <url>http://scala-tools.org/repo-releases</url>
+    </pluginRepository>
+  </pluginRepositories>
+    
+  <dependencies>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${project.version}</version>
+    </dependency>  
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.3</artifactId>
+      <version>1.9.2</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala
----------------------------------------------------------------------
diff --git a/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala b/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala
new file mode 100644
index 0000000..c60b804
--- /dev/null
+++ b/parquet-scala/src/main/scala/parquet/filter2/dsl/Dsl.scala
@@ -0,0 +1,89 @@
+package parquet.filter2.dsl
+
+import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong}
+
+import parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators, UserDefinedPredicate}
+import parquet.io.api.Binary
+
+/**
+ * Instead of using the methods in [[FilterApi]] directly in scala code,
+ * use this Dsl instead. Example usage:
+ *
+ * {{{
+ * import parquet.filter2.dsl.Dsl._
+ *
+ * val abc = IntColumn("a.b.c")
+ * val xyz = DoubleColumn("x.y.z")
+ *
+ * val myPredicate = !(abc > 10 && (xyz === 17 || ((xyz !== 13) && (xyz <= 20))))
+ *
+ * }}}
+ *
+ * Note that while the operators >, >=, <, <= all work, the == and != operators do not.
+ * Using == or != will result in a runtime exception. Instead use === and !==
+ *
+ * This is due to a limitation in overriding the the equals method.
+ */
+object Dsl {
+
+  private[Dsl] trait Column[T <: Comparable[T], C <: Operators.Column[T]] {
+    val javaColumn: C
+
+    def filterBy[U <: UserDefinedPredicate[T]](clazz: Class[U]) = FilterApi.userDefined(javaColumn, clazz)
+
+    // this is not supported because it allows for easy mistakes. For example:
+    // val pred = IntColumn("foo") == "hello"
+    // will compile, but pred will be of type boolean instead of FilterPredicate
+    override def equals(x: Any) =
+      throw new UnsupportedOperationException("You probably meant to use === or !==")
+  }
+
+  case class IntColumn(columnPath: String) extends Column[JInt, Operators.IntColumn] {
+    override val javaColumn = FilterApi.intColumn(columnPath)
+  }
+
+  case class LongColumn(columnPath: String) extends Column[JLong, Operators.LongColumn] {
+    override val javaColumn = FilterApi.longColumn(columnPath)
+  }
+
+  case class FloatColumn(columnPath: String) extends Column[JFloat, Operators.FloatColumn] {
+    override val javaColumn = FilterApi.floatColumn(columnPath)
+  }
+
+  case class DoubleColumn(columnPath: String) extends Column[JDouble, Operators.DoubleColumn] {
+    override val javaColumn = FilterApi.doubleColumn(columnPath)
+  }
+
+  case class BooleanColumn(columnPath: String) extends Column[JBoolean, Operators.BooleanColumn] {
+    override val javaColumn = FilterApi.booleanColumn(columnPath)
+  }
+
+  case class BinaryColumn(columnPath: String) extends Column[Binary, Operators.BinaryColumn] {
+    override val javaColumn = FilterApi.binaryColumn(columnPath)
+  }
+
+  implicit def enrichEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq](column: Column[T, C]): SupportsEqNotEq[T,C] = new SupportsEqNotEq(column)
+
+  class SupportsEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq](val column: Column[T, C]) {
+    def ===(v: T) = FilterApi.eq(column.javaColumn, v)
+    def !== (v: T) = FilterApi.notEq(column.javaColumn, v)
+  }
+
+  implicit def enrichLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt](column: Column[T, C]): SupportsLtGt[T,C] = new SupportsLtGt(column)
+
+  class SupportsLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt](val column: Column[T, C]) {
+    def >(v: T) = FilterApi.gt(column.javaColumn, v)
+    def >=(v: T) = FilterApi.gtEq(column.javaColumn, v)
+    def <(v: T) = FilterApi.lt(column.javaColumn, v)
+    def <=(v: T) = FilterApi.ltEq(column.javaColumn, v)
+  }
+
+  implicit def enrichPredicate(pred: FilterPredicate): RichPredicate = new RichPredicate(pred)
+  
+  class RichPredicate(val pred: FilterPredicate) {
+    def &&(other: FilterPredicate) = FilterApi.and(pred, other)
+    def ||(other: FilterPredicate) = FilterApi.or(pred, other)
+    def unary_! = FilterApi.not(pred)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala
----------------------------------------------------------------------
diff --git a/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala b/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala
new file mode 100644
index 0000000..23aa537
--- /dev/null
+++ b/parquet-scala/src/test/scala/parquet/filter2/dsl/DslTest.scala
@@ -0,0 +1,61 @@
+package parquet.filter2.dsl
+
+import java.lang.{Double => JDouble, Integer => JInt}
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import parquet.filter2.predicate.Operators.{Or, UserDefined, DoubleColumn => JDoubleColumn, IntColumn => JIntColumn}
+import parquet.filter2.predicate.{FilterApi, Statistics, UserDefinedPredicate}
+
+class DummyFilter extends UserDefinedPredicate[JInt] {
+  override def keep(value: JInt): Boolean = false
+
+  override def canDrop(statistics: Statistics[JInt]): Boolean = false
+
+  override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = false
+}
+
+@RunWith(classOf[JUnitRunner])
+class DslTest extends FlatSpec{
+  import parquet.filter2.dsl.Dsl._
+
+  "predicates" should "be correctly constructed using the dsl" in {
+    val abc = IntColumn("a.b.c")
+    val xyz = DoubleColumn("x.y.z")
+
+    val complexPredicate = !(abc > 10 && (xyz === 17 || ((xyz !== 13) && (xyz <= 20))))
+    val abcGt = FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10)
+    val xyzAnd = FilterApi.and(FilterApi.notEq[JDouble, JDoubleColumn](xyz.javaColumn, 13.0),
+      FilterApi.ltEq[JDouble, JDoubleColumn](xyz.javaColumn, 20.0))
+    val xyzEq = FilterApi.eq[JDouble, JDoubleColumn](xyz.javaColumn, 17.0)
+    val xyzPred = FilterApi.or(xyzEq, xyzAnd)
+    val expected = FilterApi.not(FilterApi.and(abcGt, xyzPred))
+
+    assert(complexPredicate === expected)
+  }
+
+  "user defined predicates" should "be correctly constructed" in {
+    val abc = IntColumn("a.b.c")
+    val pred = (abc > 10) || abc.filterBy(classOf[DummyFilter])
+
+    val expected = FilterApi.or(FilterApi.gt[JInt, JIntColumn](abc.javaColumn, 10), FilterApi.userDefined(abc.javaColumn, classOf[DummyFilter]))
+    assert(pred === expected)
+    val intUserDefined = pred.asInstanceOf[Or].getRight.asInstanceOf[UserDefined[JInt, DummyFilter]]
+
+    assert(intUserDefined.getUserDefinedPredicateClass === classOf[DummyFilter])
+    assert(intUserDefined.getUserDefinedPredicate.isInstanceOf[DummyFilter])
+  }
+
+  "Column == and != " should "throw a helpful warning" in {
+    val abc = IntColumn("a.b.c")
+
+    intercept[UnsupportedOperationException] {
+      abc == 10
+    }
+
+    intercept[UnsupportedOperationException] {
+      abc != 10
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-scrooge/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-scrooge/pom.xml b/parquet-scrooge/pom.xml
index 62ac360..a4f4366 100644
--- a/parquet-scrooge/pom.xml
+++ b/parquet-scrooge/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
@@ -86,7 +86,7 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>2.9.2</version>
+      <version>${scala.version}</version>
     </dependency>
     <dependency>
       <groupId>cascading</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-test-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-test-hadoop2/pom.xml b/parquet-test-hadoop2/pom.xml
index 6d543dc..2a32332 100644
--- a/parquet-test-hadoop2/pom.xml
+++ b/parquet-test-hadoop2/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-thrift/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index baf3bc5..890e402 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetReader.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetReader.java
index 09a5f72..fe779a8 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetReader.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftParquetReader.java
@@ -21,13 +21,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TBase;
 
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.hadoop.ParquetReader;
+import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.thrift.ThriftReadSupport;
 
+import static parquet.Preconditions.checkNotNull;
+
 /**
  * To read a parquet file into thrift objects
  * @author Julien Le Dem
- *
  * @param <T> the thrift type
  */
 public class ThriftParquetReader<T extends TBase<?,?>> extends ParquetReader<T> {
@@ -36,7 +40,9 @@ public class ThriftParquetReader<T extends TBase<?,?>> extends ParquetReader<T>
    * @param file the file to read
    * @param thriftClass the class used to read
    * @throws IOException
+   * @deprecated use {@link #build(Path)}
    */
+  @Deprecated
   public ThriftParquetReader(Path file, Class<T> thriftClass) throws IOException {
     super(file, new ThriftReadSupport<T>(thriftClass));
   }
@@ -46,7 +52,9 @@ public class ThriftParquetReader<T extends TBase<?,?>> extends ParquetReader<T>
    * @param file the file to read
    * @param thriftClass the class used to read
    * @throws IOException
+   * @deprecated use {@link #build(Path)}
    */
+  @Deprecated
   public ThriftParquetReader(Configuration conf, Path file, Class<T> thriftClass) throws IOException {
     super(conf, file, new ThriftReadSupport<T>(thriftClass));
   }
@@ -55,7 +63,9 @@ public class ThriftParquetReader<T extends TBase<?,?>> extends ParquetReader<T>
    * will use the thrift class based on the file metadata if a thrift class information is present
    * @param file the file to read
    * @throws IOException
+   * @deprecated use {@link #build(Path)}
    */
+  @Deprecated
   public ThriftParquetReader(Path file) throws IOException {
     super(file, new ThriftReadSupport<T>());
   }
@@ -65,9 +75,61 @@ public class ThriftParquetReader<T extends TBase<?,?>> extends ParquetReader<T>
    * @param conf the configuration
    * @param file the file to read
    * @throws IOException
+   * @deprecated use {@link #build(Path)}
    */
+  @Deprecated
   public ThriftParquetReader(Configuration conf, Path file) throws IOException {
     super(conf, file, new ThriftReadSupport<T>());
   }
 
+  public static <T extends TBase<?,?>> Builder<T> build(Path file) {
+    return new Builder<T>(file);
+  }
+
+  public static class Builder<T extends TBase<?,?>> {
+    private final Path file;
+    private Configuration conf;
+    private Filter filter;
+    private Class<T> thriftClass;
+
+    private Builder(Path file) {
+      this.file = checkNotNull(file, "file");
+      this.conf = new Configuration();
+      this.filter = FilterCompat.NOOP;
+      this.thriftClass = null;
+    }
+
+    public Builder<T> withConf(Configuration conf) {
+      this.conf = checkNotNull(conf, "conf");
+      return this;
+    }
+
+    public Builder<T> withFilter(Filter filter) {
+      this.filter = checkNotNull(filter, "filter");
+      return this;
+    }
+
+    /**
+     * If this is called, the thrift class is used.
+     * If not, will use the thrift class based on the file
+     * metadata if a thrift class information is present.
+     */
+    public Builder<T> withThriftClass(Class<T> thriftClass) {
+      this.thriftClass = checkNotNull(thriftClass, "thriftClass");
+      return this;
+    }
+
+    public ParquetReader<T> build() throws IOException {
+      ReadSupport<T> readSupport;
+
+      if (thriftClass != null) {
+        readSupport = new ThriftReadSupport<T>(thriftClass);
+      } else {
+        readSupport = new ThriftReadSupport<T>();
+      }
+
+      return ParquetReader.builder(readSupport, file).withConf(conf).withFilter(filter).build();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java
index bf0a7df..4157693 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java
@@ -811,6 +811,11 @@ public class ThriftRecordConverter<T> extends RecordMaterializer<T> {
     }
   }
 
+  @Override
+  public void skipCurrentRecord() {
+    rootEvents.clear();
+  }
+
   /**
    *
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-tools/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-tools/pom.xml b/parquet-tools/pom.xml
index 6220deb..c649334 100644
--- a/parquet-tools/pom.xml
+++ b/parquet-tools/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
index 5c9c6c3..387c6bb 100644
--- a/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
@@ -314,7 +314,6 @@ public class DumpCommand extends ArgsOnlyCommand {
     }
 
     private static final class DumpConverter extends PrimitiveConverter {
-        @Override public boolean isPrimitive() { return true; }
         @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8b9595a..fddf204 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
 
   <groupId>com.twitter</groupId>
   <artifactId>parquet</artifactId>
-  <version>1.5.1-SNAPSHOT</version>
+  <version>1.6.0-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <name>Parquet MR</name>
@@ -87,8 +87,9 @@
     <cascading.version>2.5.3</cascading.version>
     <parquet.format.version>2.1.0</parquet.format.version>
     <log4j.version>1.2.17</log4j.version>
-    <previous.version>1.4.0</previous.version>
+    <previous.version>1.5.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
+    <scala.version>2.9.2</scala.version>
   </properties>
 
   <modules>
@@ -103,6 +104,7 @@
     <module>parquet-pig</module>
     <module>parquet-pig-bundle</module>
     <module>parquet-protobuf</module>
+    <module>parquet-scala</module>
     <module>parquet-scrooge</module>
     <module>parquet-thrift</module>
     <module>parquet-test-hadoop2</module>
@@ -120,6 +122,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>commons-httpclient</groupId>
       <artifactId>commons-httpclient</artifactId>
       <version>3.0.1</version>
@@ -201,6 +209,8 @@
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
                      <exclude>parquet/org/**</exclude>
+                     <!-- one time exclusions that should be removed -->
+                     <exclude>parquet/io/api/Binary</exclude>
                    </excludes>
                  </requireBackwardCompatibility>
                </rules>


Mime
View raw message