parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [39/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:36 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/AndRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/AndRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/AndRecordFilter.java
new file mode 100644
index 0000000..f19119b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/AndRecordFilter.java
@@ -0,0 +1,64 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnReader;
+
+/**
+ * Provides ability to chain two filters together. Bear in mind that the first one will
+ * short circuit the second. Useful if getting a page of already filtered result.
+ *  i.e and( column("manufacturer", equalTo("Volkswagen")), page(100,50))
+ *
+ * @author Jacob Metcalf
+ */
+public final class AndRecordFilter implements RecordFilter {
+
+  private final RecordFilter boundFilter1;
+  private final RecordFilter boundFilter2;
+
+  /**
+   * Returns builder for creating an and filter.
+   * @param filter1 The first filter to check.
+   * @param filter2 The second filter to check.
+   */
+  public static final UnboundRecordFilter and( final UnboundRecordFilter filter1, final UnboundRecordFilter filter2 ) {
+    Preconditions.checkNotNull( filter1, "filter1" );
+    Preconditions.checkNotNull( filter2, "filter2" );
+    return new UnboundRecordFilter() {
+      @Override
+      public RecordFilter bind(Iterable<ColumnReader> readers) {
+        return new AndRecordFilter( filter1.bind(readers), filter2.bind( readers) );
+      }
+    };
+  }
+
+  /**
+   * Private constructor, use AndRecordFilter.and() instead.
+   */
+  private AndRecordFilter( RecordFilter boundFilter1, RecordFilter boundFilter2 ) {
+    this.boundFilter1 = boundFilter1;
+    this.boundFilter2 = boundFilter2;
+  }
+
+  @Override
+  public boolean isMatch() {
+    return boundFilter1.isMatch() && boundFilter2.isMatch();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/ColumnPredicates.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/ColumnPredicates.java b/parquet-column/src/main/java/org/apache/parquet/filter/ColumnPredicates.java
new file mode 100644
index 0000000..6bcdace
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/ColumnPredicates.java
@@ -0,0 +1,190 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * ColumnPredicates class provides checks for column values. Factory methods
+ * are provided for standard predicates which wrap the job of getting the
+ * correct value from the column.
+ */
+public class ColumnPredicates {
+
+  public static interface Predicate {
+    boolean apply(ColumnReader input);
+  }
+
+  public static interface PredicateFunction <T> {
+    boolean functionToApply(T input);
+  }
+
+  /* provide the following to avoid boxing primitives */
+
+  public static interface IntegerPredicateFunction {
+    boolean functionToApply(int input);
+  }
+
+  public static interface LongPredicateFunction {
+    boolean functionToApply(long input);
+  }
+
+  public static interface FloatPredicateFunction {
+    boolean functionToApply(float input);
+  }
+
+  public static interface DoublePredicateFunction {
+    boolean functionToApply(double input);
+  }
+
+  public static interface BooleanPredicateFunction {
+    boolean functionToApply(boolean input);
+  }
+
+  public static Predicate equalTo(final String target) {
+    Preconditions.checkNotNull(target,"target");
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return target.equals(input.getBinary().toStringUsingUTF8());
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToString(final PredicateFunction<String> fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+          return fn.functionToApply(input.getBinary().toStringUsingUTF8());
+      }
+    };
+  }
+
+  public static Predicate equalTo(final int target) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return input.getInteger() == target;
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToInteger(final IntegerPredicateFunction fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return fn.functionToApply(input.getInteger());
+      }
+    };
+  }
+
+  public static Predicate equalTo(final long target) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return input.getLong() == target;
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToLong(final LongPredicateFunction fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return fn.functionToApply(input.getLong());
+      }
+    };
+  }
+
+  public static Predicate equalTo(final float target) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return input.getFloat() == target;
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToFloat(final FloatPredicateFunction fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return fn.functionToApply(input.getFloat());
+      }
+    };
+  }
+
+  public static Predicate equalTo(final double target) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return input.getDouble() == target;
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToDouble(final DoublePredicateFunction fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return fn.functionToApply(input.getDouble());
+      }
+    };
+  }
+
+  public static Predicate equalTo(final boolean target) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return input.getBoolean() == target;
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToBoolean (final BooleanPredicateFunction fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return fn.functionToApply(input.getBoolean());
+      }
+    };
+  }
+
+  public static <E extends Enum> Predicate equalTo(final E target) {
+    Preconditions.checkNotNull(target,"target");
+    final String targetAsString = target.name();
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+        return targetAsString.equals(input.getBinary().toStringUsingUTF8());
+      }
+    };
+  }
+
+  public static Predicate applyFunctionToBinary (final PredicateFunction<Binary> fn) {
+    return new Predicate() {
+      @Override
+      public boolean apply(ColumnReader input) {
+	  return fn.functionToApply(input.getBinary());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/ColumnRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/ColumnRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/ColumnRecordFilter.java
new file mode 100644
index 0000000..e0ba607
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/ColumnRecordFilter.java
@@ -0,0 +1,75 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.column.ColumnReader;
+import java.util.Arrays;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Record filter which applies the supplied predicate to the specified column.
+ */
+public final class ColumnRecordFilter implements RecordFilter {
+
+  private final ColumnReader filterOnColumn;
+  private final ColumnPredicates.Predicate filterPredicate;
+
+  /**
+   * Factory method for record filter which applies the supplied predicate to the specified column.
+   * Note that if searching for a repeated sub-attribute it will only ever match against the
+   * first instance of it in the object.
+   *
+   * @param columnPath Dot separated path specifier, e.g. "engine.capacity"
+   * @param predicate Should call getBinary etc. and check the value
+   */
+  public static final UnboundRecordFilter column(final String columnPath,
+                                                 final ColumnPredicates.Predicate predicate) {
+    checkNotNull(columnPath, "columnPath");
+    checkNotNull(predicate,  "predicate");
+    return new UnboundRecordFilter() {
+      final String[] filterPath = columnPath.split("\\.");
+      @Override
+      public RecordFilter bind(Iterable<ColumnReader> readers) {
+        for (ColumnReader reader : readers) {
+          if ( Arrays.equals( reader.getDescriptor().getPath(), filterPath)) {
+            return new ColumnRecordFilter(reader, predicate);
+          }
+        }
+        throw new IllegalArgumentException( "Column " + columnPath + " does not exist.");
+      }
+    };
+  }
+
+  /**
+   * Private constructor. Use column() instead.
+   */
+  private ColumnRecordFilter(ColumnReader filterOnColumn, ColumnPredicates.Predicate filterPredicate) {
+    this.filterOnColumn  = filterOnColumn;
+    this.filterPredicate = filterPredicate;
+  }
+
+  /**
+   * @return true if the current value for the column reader matches the predicate.
+   */
+  @Override
+  public boolean isMatch() {
+    return filterPredicate.apply(filterOnColumn);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/NotRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/NotRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/NotRecordFilter.java
new file mode 100644
index 0000000..192a7f0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/NotRecordFilter.java
@@ -0,0 +1,58 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnReader;
+
+/**
+ * Provides ability to negate the result of a filter.
+ *
+ * @author Frank Austin Nothaft
+ */
+public final class NotRecordFilter implements RecordFilter {
+
+  private final RecordFilter boundFilter;
+
+  /**
+   * Returns builder for creating an and filter.
+   * @param filter The filter to invert.
+   */
+  public static final UnboundRecordFilter not( final UnboundRecordFilter filter) {
+    Preconditions.checkNotNull( filter, "filter" );
+    return new UnboundRecordFilter() {
+      @Override
+      public RecordFilter bind(Iterable<ColumnReader> readers) {
+        return new NotRecordFilter( filter.bind(readers) );
+      }
+    };
+  }
+
+  /**
+   * Private constructor, use NotRecordFilter.not() instead.
+   */
+  private NotRecordFilter( RecordFilter boundFilter) {
+    this.boundFilter = boundFilter;
+  }
+
+  @Override
+  public boolean isMatch() {
+    return !(boundFilter.isMatch());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/OrRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/OrRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/OrRecordFilter.java
new file mode 100644
index 0000000..c1ece04
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/OrRecordFilter.java
@@ -0,0 +1,62 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnReader;
+
+/**
+ * Provides ability to chain two filters together.
+ *
+ * @author Frank Austin Nothaft
+ */
+public final class OrRecordFilter implements RecordFilter {
+
+  private final RecordFilter boundFilter1;
+  private final RecordFilter boundFilter2;
+
+  /**
+   * Returns builder for creating an and filter.
+   * @param filter1 The first filter to check.
+   * @param filter2 The second filter to check.
+   */
+  public static final UnboundRecordFilter or( final UnboundRecordFilter filter1, final UnboundRecordFilter filter2 ) {
+    Preconditions.checkNotNull( filter1, "filter1" );
+    Preconditions.checkNotNull( filter2, "filter2" );
+    return new UnboundRecordFilter() {
+      @Override
+      public RecordFilter bind(Iterable<ColumnReader> readers) {
+        return new OrRecordFilter( filter1.bind(readers), filter2.bind( readers) );
+      }
+    };
+  }
+
+  /**
+   * Private constructor, use OrRecordFilter.or() instead.
+   */
+  private OrRecordFilter( RecordFilter boundFilter1, RecordFilter boundFilter2 ) {
+    this.boundFilter1 = boundFilter1;
+    this.boundFilter2 = boundFilter2;
+  }
+
+  @Override
+  public boolean isMatch() {
+    return boundFilter1.isMatch() || boundFilter2.isMatch();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/PagedRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/PagedRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/PagedRecordFilter.java
new file mode 100644
index 0000000..3a1891a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/PagedRecordFilter.java
@@ -0,0 +1,64 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.column.ColumnReader;
+
+/**
+ * Filter which will only materialize a page worth of results.
+ */
+public final class PagedRecordFilter implements RecordFilter {
+
+  private final long startPos;
+  private final long endPos;
+  private long currentPos = 0;
+
+  /**
+   * Returns builder for creating a paged query.
+   * @param startPos The record to start from, numbering starts at 1.
+   * @param pageSize The size of the page.
+   */
+  public static final UnboundRecordFilter page( final long startPos, final long pageSize ) {
+    return new UnboundRecordFilter() {
+      @Override
+      public RecordFilter bind(Iterable<ColumnReader> readers) {
+        return new PagedRecordFilter( startPos, pageSize );
+      }
+    };
+  }
+
+  /**
+   * Private constructor, use column() instead.
+   */
+  private PagedRecordFilter(long startPos, long pageSize) {
+    this.startPos = startPos;
+    this.endPos   = startPos + pageSize;
+  }
+
+  /**
+   * Keeps track of how many times it is called. Only returns matches when the
+   * record number is in the range.
+   */
+  @Override
+  public boolean isMatch() {
+    currentPos++;
+    return (( currentPos >= startPos ) && ( currentPos < endPos ));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/RecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/RecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/RecordFilter.java
new file mode 100644
index 0000000..b5eece8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/RecordFilter.java
@@ -0,0 +1,34 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+
+/**
+ * Filter to be applied to a record to work out whether to skip it.
+ *
+ * @author Jacob Metcalf
+ */
+public interface RecordFilter {
+
+  /**
+   * Works out whether the current record can pass through the filter.
+   */
+  boolean isMatch();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter/UnboundRecordFilter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter/UnboundRecordFilter.java b/parquet-column/src/main/java/org/apache/parquet/filter/UnboundRecordFilter.java
new file mode 100644
index 0000000..5ddec24
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter/UnboundRecordFilter.java
@@ -0,0 +1,36 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter;
+
+import org.apache.parquet.column.ColumnReader;
+
+/**
+ * Builder for a record filter. Idea is that each filter provides a create function
+ * which returns an unbound filter. This only becomes a filter when it is bound to the actual
+ * columns.
+ *
+ * @author Jacob Metcalf
+ */
+public interface UnboundRecordFilter {
+
+  /**
+   * Call to bind to actual columns and create filter.
+   */
+  RecordFilter bind( Iterable<ColumnReader> readers);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
new file mode 100644
index 0000000..2efcc39
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
@@ -0,0 +1,158 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.compat;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Parquet currently has two ways to specify a filter for dropping records at read time.
+ * The first way, that only supports filtering records during record assembly, is found
+ * in {@link org.apache.parquet.filter}. The new API (found in {@link org.apache.parquet.filter2}) supports
+ * also filtering entire rowgroups of records without reading them at all.
+ *
+ * This class defines a common interface that both of these filters share,
+ * {@link Filter}. A Filter can be either an {@link UnboundRecordFilter} from the old API, or
+ * a {@link FilterPredicate} from the new API, or a sentinel no-op filter.
+ *
+ * Having this common interface simplifies passing a filter through the read path of parquet's
+ * codebase.
+ */
+public class FilterCompat {
+  private static final Log LOG = Log.getLog(FilterCompat.class);
+
+  /**
+   * Anyone wanting to use a {@link Filter} need only implement this interface,
+   * per the visitor pattern.
+   */
+  public static interface Visitor<T> {
+    T visit(FilterPredicateCompat filterPredicateCompat);
+    T visit(UnboundRecordFilterCompat unboundRecordFilterCompat);
+    T visit(NoOpFilter noOpFilter);
+  }
+
+  public static interface Filter {
+    <R> R accept(Visitor<R> visitor);
+  }
+
+  // sentinel no op filter that signals "do no filtering"
+  public static final Filter NOOP = new NoOpFilter();
+
+  /**
+   * Given a FilterPredicate, return a Filter that wraps it.
+   * This method also logs the filter being used and rewrites
+   * the predicate to not include the not() operator.
+   */
+  public static Filter get(FilterPredicate filterPredicate) {
+    checkNotNull(filterPredicate, "filterPredicate");
+
+    LOG.info("Filtering using predicate: " + filterPredicate);
+
+    // rewrite the predicate to not include the not() operator
+    FilterPredicate collapsedPredicate = LogicalInverseRewriter.rewrite(filterPredicate);
+
+    if (!filterPredicate.equals(collapsedPredicate)) {
+      LOG.info("Predicate has been collapsed to: " + collapsedPredicate);
+    }
+
+    return new FilterPredicateCompat(collapsedPredicate);
+  }
+
+  /**
+   * Given an UnboundRecordFilter, return a Filter that wraps it.
+   */
+  public static Filter get(UnboundRecordFilter unboundRecordFilter) {
+    return new UnboundRecordFilterCompat(unboundRecordFilter);
+  }
+
+  /**
+   * Given either a FilterPredicate or the class of an UnboundRecordFilter, or neither (but not both)
+   * return a Filter that wraps whichever was provided.
+   *
+   * Either filterPredicate or unboundRecordFilterClass must be null, or an exception is thrown.
+   *
+   * If both are null, the no op filter will be returned.
+   */
+  public static Filter get(FilterPredicate filterPredicate, UnboundRecordFilter unboundRecordFilter) {
+    checkArgument(filterPredicate == null || unboundRecordFilter == null,
+        "Cannot provide both a FilterPredicate and an UnboundRecordFilter");
+
+    if (filterPredicate != null) {
+      return get(filterPredicate);
+    }
+
+    if (unboundRecordFilter != null) {
+      return get(unboundRecordFilter);
+    }
+
+    return NOOP;
+  }
+
+  // wraps a FilterPredicate
+  public static final class FilterPredicateCompat implements Filter {
+    private final FilterPredicate filterPredicate;
+
+    private FilterPredicateCompat(FilterPredicate filterPredicate) {
+      this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+    }
+
+    public FilterPredicate getFilterPredicate() {
+      return filterPredicate;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // wraps an UnboundRecordFilter
+  public static final class UnboundRecordFilterCompat implements Filter {
+    private final UnboundRecordFilter unboundRecordFilter;
+
+    private UnboundRecordFilterCompat(UnboundRecordFilter unboundRecordFilter) {
+      this.unboundRecordFilter = checkNotNull(unboundRecordFilter, "unboundRecordFilter");
+    }
+
+    public UnboundRecordFilter getUnboundRecordFilter() {
+      return unboundRecordFilter;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // sentinel no op filter
+  public static final class NoOpFilter implements Filter {
+    private NoOpFilter() {}
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
new file mode 100644
index 0000000..b73e59c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java
@@ -0,0 +1,212 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.io.Serializable;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq;
+import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.Operators.UserDefinedByClass;
+import org.apache.parquet.filter2.predicate.Operators.UserDefinedByInstance;
+
+/**
+ * The Filter API is expressed through these static methods.
+ *
+ * Example usage:
+ * {@code
+ *
+ *   IntColumn foo = intColumn("foo");
+ *   DoubleColumn bar = doubleColumn("x.y.bar");
+ *
+ *   // foo == 10 || bar <= 17.0
+ *   FilterPredicate pred = or(eq(foo, 10), ltEq(bar, 17.0));
+ *
+ * }
+ */
+// TODO: Support repeated columns (https://issues.apache.org/jira/browse/PARQUET-34)
+//
+// TODO: Support filtering on groups (eg, filter where this group is / isn't null)
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-43)
+
+// TODO: Consider adding support for more column types that aren't coupled with parquet types, eg Column<String>
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-35)
+public final class FilterApi {
+  private FilterApi() { }
+
+  public static IntColumn intColumn(String columnPath) {
+    return new IntColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static LongColumn longColumn(String columnPath) {
+    return new LongColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static FloatColumn floatColumn(String columnPath) {
+    return new FloatColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static DoubleColumn doubleColumn(String columnPath) {
+    return new DoubleColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static BooleanColumn booleanColumn(String columnPath) {
+    return new BooleanColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static BinaryColumn binaryColumn(String columnPath) {
+    return new BinaryColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  /**
+   * Keeps records if their value is equal to the provided value.
+   * Nulls are treated the same way the java programming language does.
+   * For example:
+   *   eq(column, null) will keep all records whose value is null.
+   *   eq(column, 7) will keep all records whose value is 7, and will drop records whose value is null
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsEqNotEq> Eq<T> eq(C column, T value) {
+    return new Eq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is not equal to the provided value.
+   * Nulls are treated the same way the java programming language does.
+   * For example:
+   *   notEq(column, null) will keep all records whose value is not null.
+   *   notEq(column, 7) will keep all records whose value is not 7, including records whose value is null.
+   *
+   *   NOTE: this is different from how some query languages handle null. For example, SQL and pig will drop
+   *   nulls when you filter by not equal to 7. To achieve similar behavior in this api, do:
+   *   and(notEq(column, 7), notEq(column, null))
+   *
+   *   NOTE: be sure to read the {@link #lt}, {@link #ltEq}, {@link #gt}, {@link #gtEq} operator's docs
+   *         for how they handle nulls
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsEqNotEq> NotEq<T> notEq(C column, T value) {
+    return new NotEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is less than (but not equal to) the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   lt(column, 7) will keep all records whose value is less than (but not equal to) 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> Lt<T> lt(C column, T value) {
+    return new Lt<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is less than or equal to the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   ltEq(column, 7) will keep all records whose value is less than or equal to 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> LtEq<T> ltEq(C column, T value) {
+    return new LtEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is greater than (but not equal to) the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   gt(column, 7) will keep all records whose value is greater than (but not equal to) 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> Gt<T> gt(C column, T value) {
+    return new Gt<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is greater than or equal to the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   gtEq(column, 7) will keep all records whose value is greater than or equal to 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq<T> gtEq(C column, T value) {
+    return new GtEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records that pass the provided {@link UserDefinedPredicate}
+   *
+   * The provided class must have a default constructor. To use an instance
+   * of a UserDefinedPredicate instead, see {@link #userDefined(column, udp)} below.
+   */
+  public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
+    UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
+    return new UserDefinedByClass<T, U>(column, clazz);
+  }
+  
+  /**
+   * Keeps records that pass the provided {@link UserDefinedPredicate}
+   *
+   * The provided instance of UserDefinedPredicate must be serializable.
+   */
+  public static <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
+    UserDefined<T, U> userDefined(Column<T> column, U udp) {
+    return new UserDefinedByInstance<T, U>(column, udp);
+  }
+
+  /**
+   * Constructs the logical and of two predicates. Records will be kept if both the left and right predicate agree
+   * that the record should be kept.
+   */
+  public static FilterPredicate and(FilterPredicate left, FilterPredicate right) {
+    return new And(left, right);
+  }
+
+  /**
+   * Constructs the logical or of two predicates. Records will be kept if either the left or right predicate
+   * is satisfied (or both).
+   */
+  public static FilterPredicate or(FilterPredicate left, FilterPredicate right) {
+    return new Or(left, right);
+  }
+
+  /**
+   * Constructs the logical not (or inverse) of a predicate.
+   * Records will be kept if the provided predicate is not satisfied.
+   */
+  public static FilterPredicate not(FilterPredicate predicate) {
+    return new Not(predicate);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
new file mode 100644
index 0000000..8afb334
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java
@@ -0,0 +1,72 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+
+/**
+ * A FilterPredicate is an expression tree describing the criteria for which records to keep when loading data from
+ * a parquet file. These predicates are applied in multiple places. Currently, they are applied to all row groups at
+ * job submission time to see if we can potentially drop entire row groups, and then they are applied during column
+ * assembly to drop individual records that are not wanted.
+ *
+ * FilterPredicates do not contain closures or instances of anonymous classes, rather they are expressed as
+ * an expression tree of operators.
+ *
+ * FilterPredicates are implemented in terms of the visitor pattern.
+ *
+ * See {@link Operators} for the implementation of the operator tokens,
+ * and {@link FilterApi} for the dsl functions for constructing an expression tree.
+ */
+public interface FilterPredicate {
+
+  /**
+   * A FilterPredicate must accept a Visitor, per the visitor pattern.
+   */
+  <R> R accept(Visitor<R> visitor);
+
+  /**
+   * A FilterPredicate Visitor must visit all the operators in a FilterPredicate expression tree,
+   * and must handle recursion itself, per the visitor pattern.
+   */
+  public static interface Visitor<R> {
+    <T extends Comparable<T>> R visit(Eq<T> eq);
+    <T extends Comparable<T>> R visit(NotEq<T> notEq);
+    <T extends Comparable<T>> R visit(Lt<T> lt);
+    <T extends Comparable<T>> R visit(LtEq<T> ltEq);
+    <T extends Comparable<T>> R visit(Gt<T> gt);
+    <T extends Comparable<T>> R visit(GtEq<T> gtEq);
+    R visit(And and);
+    R visit(Or or);
+    R visit(Not not);
+    <T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(UserDefined<T, U> udp);
+    <T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(LogicalNotUserDefined<T, U> udp);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
new file mode 100644
index 0000000..134a29c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java
@@ -0,0 +1,113 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+
+/**
+ * Recursively removes all use of the not() operator in a predicate
+ * by replacing all instances of not(x) with the inverse(x),
+ * eg: not(and(eq(), not(eq(y))) -> or(notEq(), eq(y))
+ *
+ * The returned predicate should have the same meaning as the original, but
+ * without the use of the not() operator.
+ *
+ * See also {@link LogicalInverter}, which is used
+ * to do the inversion.
+ */
+public final class LogicalInverseRewriter implements Visitor<FilterPredicate> {
+  private static final LogicalInverseRewriter INSTANCE = new LogicalInverseRewriter();
+
+  public static FilterPredicate rewrite(FilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private LogicalInverseRewriter() { }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
+    return eq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotEq<T> notEq) {
+    return notEq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Lt<T> lt) {
+    return lt;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(LtEq<T> ltEq) {
+    return ltEq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Gt<T> gt) {
+    return gt;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(GtEq<T> gtEq) {
+    return gtEq;
+  }
+
+  @Override
+  public FilterPredicate visit(And and) {
+    return and(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Or or) {
+    return or(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Not not) {
+    return LogicalInverter.invert(not.getPredicate().accept(this));
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(UserDefined<T, U> udp) {
+    return udp;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
+    return udp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
new file mode 100644
index 0000000..266bb0c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Converts a {@link FilterPredicate} to its logical inverse.
+ * The returned predicate should be equivalent to not(p), but without
+ * the use of a not() operator.
+ *
+ * See also {@link LogicalInverseRewriter}, which can remove the use
+ * of all not() operators without inverting the overall predicate.
+ */
+public final class LogicalInverter implements Visitor<FilterPredicate> {
+  private static final LogicalInverter INSTANCE = new LogicalInverter();
+
+  public static FilterPredicate invert(FilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private LogicalInverter() {}
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
+    return new NotEq<T>(eq.getColumn(), eq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotEq<T> notEq) {
+    return new Eq<T>(notEq.getColumn(), notEq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Lt<T> lt) {
+    return new GtEq<T>(lt.getColumn(), lt.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(LtEq<T> ltEq) {
+    return new Gt<T>(ltEq.getColumn(), ltEq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Gt<T> gt) {
+    return new LtEq<T>(gt.getColumn(), gt.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(GtEq<T> gtEq) {
+    return new Lt<T>(gtEq.getColumn(), gtEq.getValue());
+  }
+
+  @Override
+  public FilterPredicate visit(And and) {
+    return new Or(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Or or) {
+    return new And(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Not not) {
+    return not.getPredicate();
+  }
+
+  @Override
+  public <T extends Comparable<T>,  U extends UserDefinedPredicate<T>> FilterPredicate visit(UserDefined<T, U> udp) {
+    return new LogicalNotUserDefined<T, U>(udp);
+  }
+
+  @Override
+  public <T extends Comparable<T>,  U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
+    return udp.getUserDefined();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
new file mode 100644
index 0000000..32b4430
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
@@ -0,0 +1,526 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.io.Serializable;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * These are the operators in a filter predicate expression tree.
+ * They are constructed by using the methods in {@link FilterApi}
+ */
+public final class Operators {
+  private Operators() { }
+
+  public static abstract class Column<T extends Comparable<T>> implements Serializable {
+    private final ColumnPath columnPath;
+    private final Class<T> columnType;
+
+    protected Column(ColumnPath columnPath, Class<T> columnType) {
+      checkNotNull(columnPath, "columnPath");
+      checkNotNull(columnType, "columnType");
+      this.columnPath = columnPath;
+      this.columnType = columnType;
+    }
+
+    public Class<T> getColumnType() {
+      return columnType;
+    }
+
+    public ColumnPath getColumnPath() {
+      return columnPath;
+    }
+
+    @Override
+    public String toString() {
+      return "column(" + columnPath.toDotString() + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Column column = (Column) o;
+
+      if (!columnType.equals(column.columnType)) return false;
+      if (!columnPath.equals(column.columnPath)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = columnPath.hashCode();
+      result = 31 * result + columnType.hashCode();
+      return result;
+    }
+  }
+
+  public static interface SupportsEqNotEq { } // marker for columns that can be used with eq() and notEq()
+  public static interface SupportsLtGt extends SupportsEqNotEq { } // marker for columns that can be used with lt(), ltEq(), gt(), gtEq()
+
+  public static final class IntColumn extends Column<Integer> implements SupportsLtGt {
+    IntColumn(ColumnPath columnPath) {
+      super(columnPath, Integer.class);
+    }
+  }
+
+  public static final class LongColumn extends Column<Long> implements SupportsLtGt {
+    LongColumn(ColumnPath columnPath) {
+      super(columnPath, Long.class);
+    }
+  }
+
+  public static final class DoubleColumn extends Column<Double> implements SupportsLtGt {
+    DoubleColumn(ColumnPath columnPath) {
+      super(columnPath, Double.class);
+    }
+  }
+
+  public static final class FloatColumn extends Column<Float> implements SupportsLtGt {
+    FloatColumn(ColumnPath columnPath) {
+      super(columnPath, Float.class);
+    }
+  }
+
+  public static final class BooleanColumn extends Column<Boolean> implements SupportsEqNotEq {
+    BooleanColumn(ColumnPath columnPath) {
+      super(columnPath, Boolean.class);
+    }
+  }
+
+  public static final class BinaryColumn extends Column<Binary> implements SupportsLtGt {
+    BinaryColumn(ColumnPath columnPath) {
+      super(columnPath, Binary.class);
+    }
+  }
+
+  // base class for Eq, NotEq, Lt, Gt, LtEq, GtEq
+  static abstract class ColumnFilterPredicate<T extends Comparable<T>> implements FilterPredicate, Serializable  {
+    private final Column<T> column;
+    private final T value;
+    private final String toString;
+
+    protected ColumnFilterPredicate(Column<T> column, T value) {
+      this.column = checkNotNull(column, "column");
+
+      // Eq and NotEq allow value to be null, Lt, Gt, LtEq, GtEq however do not, so they guard against
+      // null in their own constructors.
+      this.value = value;
+
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")";
+    }
+
+    public Column<T> getColumn() {
+      return column;
+    }
+
+    public T getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ColumnFilterPredicate that = (ColumnFilterPredicate) o;
+
+      if (!column.equals(that.column)) return false;
+      if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = column.hashCode();
+      result = 31 * result + (value != null ? value.hashCode() : 0);
+      result = 31 * result + getClass().hashCode();
+      return result;
+    }
+  }
+
+  public static final class Eq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value can be null
+    Eq(Column<T> column, T value) {
+      super(column, value);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+  }
+
+  public static final class NotEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value can be null
+    NotEq(Column<T> column, T value) {
+      super(column, value);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+
+  public static final class Lt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    Lt(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class LtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    LtEq(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+
+  public static final class Gt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    Gt(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class GtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    GtEq(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // base class for And, Or
+  private static abstract class BinaryLogicalFilterPredicate implements FilterPredicate, Serializable {
+    private final FilterPredicate left;
+    private final FilterPredicate right;
+    private final String toString;
+
+    protected BinaryLogicalFilterPredicate(FilterPredicate left, FilterPredicate right) {
+      this.left = checkNotNull(left, "left");
+      this.right = checkNotNull(right, "right");
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + left + ", " + right + ")";
+    }
+
+    public FilterPredicate getLeft() {
+      return left;
+    }
+
+    public FilterPredicate getRight() {
+      return right;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      BinaryLogicalFilterPredicate that = (BinaryLogicalFilterPredicate) o;
+
+      if (!left.equals(that.left)) return false;
+      if (!right.equals(that.right)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = left.hashCode();
+      result = 31 * result + right.hashCode();
+      result = 31 * result + getClass().hashCode();
+      return result;
+    }
+  }
+
+  public static final class And extends BinaryLogicalFilterPredicate {
+
+    And(FilterPredicate left, FilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class Or extends BinaryLogicalFilterPredicate {
+
+    Or(FilterPredicate left, FilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static class Not implements FilterPredicate, Serializable {
+    private final FilterPredicate predicate;
+    private final String toString;
+
+    Not(FilterPredicate predicate) {
+      this.predicate = checkNotNull(predicate, "predicate");
+      this.toString = "not(" + predicate + ")";
+    }
+
+    public FilterPredicate getPredicate() {
+      return predicate;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Not not = (Not) o;
+      return predicate.equals(not.predicate);
+    }
+
+    @Override
+    public int hashCode() {
+      return predicate.hashCode() * 31 + getClass().hashCode();
+    }
+  }
+
+  public static abstract class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
+    protected final Column<T> column;
+
+    UserDefined(Column<T> column) {
+      this.column = checkNotNull(column, "column");
+    }
+
+    public Column<T> getColumn() {
+      return column;
+    }
+
+    public abstract U getUserDefinedPredicate();
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+    
+  public static final class UserDefinedByClass<T extends Comparable<T>, U extends UserDefinedPredicate<T>> extends UserDefined<T, U> {
+    private final Class<U> udpClass;
+    private final String toString;
+    private static final String INSTANTIATION_ERROR_MESSAGE =
+        "Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";
+
+    UserDefinedByClass(Column<T> column, Class<U> udpClass) {
+      super(column);
+      this.udpClass = checkNotNull(udpClass, "udpClass");
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
+
+      // defensively try to instantiate the class early to make sure that it's possible
+      getUserDefinedPredicate();
+    }
+
+    public Class<U> getUserDefinedPredicateClass() {
+      return udpClass;
+    }
+
+    @Override
+    public U getUserDefinedPredicate() {
+      try {
+        return udpClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      UserDefinedByClass that = (UserDefinedByClass) o;
+
+      if (!column.equals(that.column)) return false;
+      if (!udpClass.equals(that.udpClass)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = column.hashCode();
+      result = 31 * result + udpClass.hashCode();
+      result = result * 31 + getClass().hashCode();
+      return result;
+    }
+  }
+  
+  public static final class UserDefinedByInstance<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> extends UserDefined<T, U> {
+    private final String toString;
+    private final U udpInstance;
+
+    UserDefinedByInstance(Column<T> column, U udpInstance) {
+      super(column);
+      this.udpInstance = checkNotNull(udpInstance, "udpInstance");
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")";
+    }
+
+    @Override
+    public U getUserDefinedPredicate() {
+      return udpInstance;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      UserDefinedByInstance that = (UserDefinedByInstance) o;
+
+      if (!column.equals(that.column)) return false;
+      if (!udpInstance.equals(that.udpInstance)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = column.hashCode();
+      result = 31 * result + udpInstance.hashCode();
+      result = result * 31 + getClass().hashCode();
+      return result;
+    }
+  }
+
+  // Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use
+  // of the not() operator
+  public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
+    private final UserDefined<T, U> udp;
+    private final String toString;
+
+    LogicalNotUserDefined(UserDefined<T, U> userDefined) {
+      this.udp = checkNotNull(userDefined, "userDefined");
+      this.toString = "inverted(" + udp + ")";
+    }
+
+    public UserDefined<T, U> getUserDefined() {
+      return udp;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      LogicalNotUserDefined that = (LogicalNotUserDefined) o;
+
+      if (!udp.equals(that.udp)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = udp.hashCode();
+      result = result * 31 + getClass().hashCode();
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
new file mode 100644
index 0000000..d473841
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
@@ -0,0 +1,193 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.ColumnFilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Inspects the column types found in the provided {@link FilterPredicate} and compares them
+ * to the actual schema found in the parquet file. If the provided predicate's types are
+ * not consistent with the file schema, and IllegalArgumentException is thrown.
+ *
+ * Ideally, all this would be checked at compile time, and this class wouldn't be needed.
+ * If we can come up with a way to do that, we should.
+ *
+ * This class is stateful, cannot be reused, and is not thread safe.
+ *
+ * TODO: detect if a column is optional or required and validate that eq(null)
+ * TODO: is not called on required fields (is that too strict?)
+ * TODO: (https://issues.apache.org/jira/browse/PARQUET-44)
+ */
+public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Void> {
+
+  public static void validate(FilterPredicate predicate, MessageType schema) {
+    checkNotNull(predicate, "predicate");
+    checkNotNull(schema, "schema");
+    predicate.accept(new SchemaCompatibilityValidator(schema));
+  }
+
+  // A map of column name to the type the user supplied for this column.
+  // Used to validate that the user did not provide different types for the same
+  // column.
+  private final Map<ColumnPath, Class<?>> columnTypesEncountered = new HashMap<ColumnPath, Class<?>>();
+
+  // the columns (keyed by path) according to the file's schema. This is the source of truth, and
+  // we are validating that what the user provided agrees with these.
+  private final Map<ColumnPath, ColumnDescriptor> columnsAccordingToSchema = new HashMap<ColumnPath, ColumnDescriptor>();
+
+  // the original type of a column, keyed by path
+  private final Map<ColumnPath, OriginalType> originalTypes = new HashMap<ColumnPath, OriginalType>();
+
+  private SchemaCompatibilityValidator(MessageType schema) {
+
+    for (ColumnDescriptor cd : schema.getColumns()) {
+      ColumnPath columnPath = ColumnPath.get(cd.getPath());
+      columnsAccordingToSchema.put(columnPath, cd);
+
+      OriginalType ot = schema.getType(cd.getPath()).getOriginalType();
+      if (ot != null) {
+        originalTypes.put(columnPath, ot);
+      }
+    }
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Eq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(NotEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Lt<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(LtEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Gt<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(GtEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public Void visit(And and) {
+    and.getLeft().accept(this);
+    and.getRight().accept(this);
+    return null;
+  }
+
+  @Override
+  public Void visit(Or or) {
+    or.getLeft().accept(this);
+    or.getRight().accept(this);
+    return null;
+  }
+
+  @Override
+  public Void visit(Not not) {
+    not.getPredicate().accept(this);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(UserDefined<T, U> udp) {
+    validateColumn(udp.getColumn());
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(LogicalNotUserDefined<T, U> udp) {
+    return udp.getUserDefined().accept(this);
+  }
+
+  private <T extends Comparable<T>> void validateColumnFilterPredicate(ColumnFilterPredicate<T> pred) {
+    validateColumn(pred.getColumn());
+  }
+
+  private <T extends Comparable<T>> void validateColumn(Column<T> column) {
+    ColumnPath path = column.getColumnPath();
+
+    Class<?> alreadySeen = columnTypesEncountered.get(path);
+    if (alreadySeen != null && !alreadySeen.equals(column.getColumnType())) {
+      throw new IllegalArgumentException("Column: "
+          + path.toDotString()
+          + " was provided with different types in the same predicate."
+          + " Found both: (" + alreadySeen + ", " + column.getColumnType() + ")");
+    }
+
+    if (alreadySeen == null) {
+      columnTypesEncountered.put(path, column.getColumnType());
+    }
+
+    ColumnDescriptor descriptor = getColumnDescriptor(path);
+
+    if (descriptor.getMaxRepetitionLevel() > 0) {
+      throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. "
+          + "Column " + path.toDotString() + " is repeated.");
+    }
+
+    ValidTypeMap.assertTypeValid(column, descriptor.getType(), originalTypes.get(path));
+  }
+
+  private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
+    ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath);
+    checkArgument(cd != null, "Column " + columnPath + " was not found in schema!");
+    return cd;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
new file mode 100644
index 0000000..22e4027
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
@@ -0,0 +1,42 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Contains statistics about a group of records
+ */
+public class Statistics<T> {
+  private final T min;
+  private final T max;
+
+  public Statistics(T min, T max) {
+    this.min = checkNotNull(min, "min");
+    this.max = checkNotNull(max, "max");
+  }
+
+  public T getMin() {
+    return min;
+  }
+
+  public T getMax() {
+    return max;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/UserDefinedPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/UserDefinedPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/UserDefinedPredicate.java
new file mode 100644
index 0000000..16b7c3d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/UserDefinedPredicate.java
@@ -0,0 +1,108 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+/**
+ * A UserDefinedPredicate decides whether a record should be kept or dropped, first by
+ * inspecting meta data about a group of records to see if the entire group can be dropped,
+ * then by inspecting actual values of a single column. These predicates can be combined into
+ * a complex boolean expression via the {@link FilterApi}.
+ *
+ * @param <T> The type of the column this predicate is applied to.
+ */
+// TODO: consider avoiding autoboxing and adding the specialized methods for each type
+// TODO: downside is that's fairly unwieldy for users
+public abstract class UserDefinedPredicate<T extends Comparable<T>> {
+
+  /**
+   * A udp must have a default constructor.
+   * The udp passed to {@link FilterApi} will not be serialized along with its state.
+   * Only its class name will be recorded, it will be instantiated reflectively via the default
+   * constructor.
+   */
+  public UserDefinedPredicate() { }
+
+  /**
+   * Return true to keep the record with this value, false to drop it.
+   */
+  public abstract boolean keep(T value);
+
+  /**
+   * Given information about a group of records (eg, the min and max value)
+   * Return true to drop all the records in this group, false to keep them for further
+   * inspection. Returning false here will cause the records to be loaded and each value
+   * will be passed to {@link #keep} to make the final decision.
+   *
+   * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
+   * though it is much more efficient to drop entire chunks of records here if you can.
+   */
+  public abstract boolean canDrop(Statistics<T> statistics);
+
+  /**
+   * Same as {@link #canDrop} except this method describes the logical inverse
+   * behavior of this predicate. If this predicate is passed to the not() operator, then
+   * {@link #inverseCanDrop} will be called instead of {@link #canDrop}
+   *
+   * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
+   * though it is much more efficient to drop entire chunks of records here if you can.
+   *
+   * It may be valid to simply return !canDrop(statistics) but that is not always the case.
+   * To illustrate, look at this re-implementation of a UDP that checks for values greater than 7:
+   *
+   * {@code 
+   * 
+   * // This is just an example, you should use the built in {@link FilterApi#gt} operator instead of
+   * // implementing your own like this.
+   *  
+   * public class IntGreaterThan7UDP extends UserDefinedPredicate<Integer> {
+   *   @Override
+   *   public boolean keep(Integer value) {
+   *     // here we just check if the value is greater than 7.
+   *     // here, parquet knows that if the predicate not(columnX, IntGreaterThan7UDP) is being evaluated,
+   *     // it is safe to simply use !IntEquals7UDP.keep(value)
+   *     return value > 7;
+   *   }
+   * 
+   *   @Override
+   *   public boolean canDrop(Statistics<Integer> statistics) {
+   *     // here we drop a group of records if they are all less than or equal to 7,
+   *     // (there can't possibly be any values greater than 7 in this group of records)
+   *     return statistics.getMax() <= 7;
+   *   }
+   * 
+   *   @Override
+   *   public boolean inverseCanDrop(Statistics<Integer> statistics) {
+   *     // here the predicate not(columnX, IntGreaterThan7UDP) is being evaluated, which means we want
+   *     // to keep all records whose value is is not greater than 7, or, rephrased, whose value is less than or equal to 7.
+   *     // notice what would happen if parquet just tried to evaluate !IntGreaterThan7UDP.canDrop():
+   *     // !IntGreaterThan7UDP.canDrop(stats) == !(stats.getMax() <= 7) == (stats.getMax() > 7)
+   *     // it would drop the following group of records: [100, 1, 2, 3], even though this group of records contains values
+   *     // less than than or equal to 7.
+   * 
+   *     // what we actually want to do is drop groups of records where the *min* is greater than 7, (not the max)
+   *     // for example: the group of records: [100, 8, 9, 10] has a min of 8, so there's no way there are going
+   *     // to be records with a value
+   *     // less than or equal to 7 in this group.
+   *     return statistics.getMin() > 7;
+   *   }
+   * }
+   * }
+   */
+  public abstract boolean inverseCanDrop(Statistics<T> statistics);
+}


Mime
View raw message