drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [09/10] drill git commit: DRILL-1950: Parquet rowgroup level filter pushdown in query planning time.
Date Sat, 05 Nov 2016 00:11:36 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
index b613707..37a57dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,268 +15,281 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.fn.CastFunctions;
+import org.apache.drill.common.expression.fn.FuncHolder;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.expr.stat.ParquetPredicates;
+import org.apache.drill.exec.expr.stat.TypedFieldExpr;
+import org.apache.drill.exec.ops.UdfUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.parquet.filter2.predicate.FilterApi;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
-public class ParquetFilterBuilder extends
-        AbstractExprVisitor<FilterPredicate, Void, RuntimeException> {
-    static final Logger logger = LoggerFactory
-            .getLogger(ParquetFilterBuilder.class);
-    private LogicalExpression le;
-    private boolean allExpressionsConverted = true;
-    private ParquetGroupScan groupScan;
-
-    public ParquetFilterBuilder(ParquetGroupScan groupScan, LogicalExpression conditionExp) {
-        this.le = conditionExp;
-        this.groupScan = groupScan;
+/**
+ * A visitor which visits a materialized logical expression, and build ParquetFilterPredicate
+ * If a visitXXX method returns null, that means the corresponding filter branch is not qualified for pushdown.
+ */
+public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, Set<LogicalExpression>, RuntimeException> {
+  static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class);
+
+  private final UdfUtilities udfUtilities;
+
+  /**
+   * @param expr materialized filter expression
+   * @param constantBoundaries set of constant expressions
+   * @param udfUtilities
+   */
+  public static LogicalExpression buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) {
+    final LogicalExpression predicate = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries);
+    return predicate;
+  }
+
+  private ParquetFilterBuilder(UdfUtilities udfUtilities) {
+    this.udfUtilities = udfUtilities;
+  }
+
+  @Override
+  public LogicalExpression visitUnknown(LogicalExpression e, Set<LogicalExpression> value) {
+    if (e instanceof TypedFieldExpr &&
+        ! containsArraySeg(((TypedFieldExpr) e).getPath()) &&
+        e.getMajorType().getMode() != TypeProtos.DataMode.REPEATED) {
+      // A filter is not qualified for push down, if
+      // 1. it contains an array segment : a.b[1], a.b[1].c.d
+      // 2. it's repeated type.
+      return e;
     }
 
-    public ParquetGroupScan parseTree() {
-        FilterPredicate predicate = le.accept(this, null);
-        try {
-            return this.groupScan.clone(predicate);
-        } catch (IOException e) {
-            logger.error("Failed to set Parquet filter", e);
-            return null;
+    return null;
+  }
+
+  @Override
+  public LogicalExpression visitIntConstant(ValueExpressions.IntExpression intExpr, Set<LogicalExpression> value)
+      throws RuntimeException {
+    return intExpr;
+  }
+
+  @Override
+  public LogicalExpression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Set<LogicalExpression> value)
+      throws RuntimeException {
+    return dExpr;
+  }
+
+  @Override
+  public LogicalExpression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Set<LogicalExpression> value)
+      throws RuntimeException {
+    return fExpr;
+  }
+
+  @Override
+  public LogicalExpression visitLongConstant(ValueExpressions.LongExpression intExpr, Set<LogicalExpression> value)
+      throws RuntimeException {
+    return intExpr;
+  }
+
+  @Override
+  public LogicalExpression visitDateConstant(ValueExpressions.DateExpression dateExpr, Set<LogicalExpression> value) throws RuntimeException {
+    return dateExpr;
+  }
+
+  @Override
+  public LogicalExpression visitTimeStampConstant(ValueExpressions.TimeStampExpression tsExpr, Set<LogicalExpression> value) throws RuntimeException {
+    return tsExpr;
+  }
+
+  @Override
+  public LogicalExpression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Set<LogicalExpression> value) throws RuntimeException {
+    return timeExpr;
+  }
+
+  @Override
+  public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExpression> value) {
+    List<LogicalExpression> childPredicates = new ArrayList<>();
+    String functionName = op.getName();
+
+    for (LogicalExpression arg : op.args) {
+      LogicalExpression childPredicate = arg.accept(this, value);
+      if (childPredicate == null) {
+        if (functionName.equals("booleanOr")) {
+          // we can't include any leg of the OR if any of the predicates cannot be converted
+          return null;
         }
+      } else {
+        childPredicates.add(childPredicate);
+      }
     }
 
-    public boolean areAllExpressionsConverted() {
-        return allExpressionsConverted;
+    if (childPredicates.size() == 0) {
+      return null; // none leg is qualified, return null.
+    } else if (childPredicates.size() == 1) {
+      return childPredicates.get(0); // only one leg is qualified, remove boolean op.
+    } else {
+      if (functionName.equals("booleanOr")) {
+        return new ParquetPredicates.OrPredicate(op.getName(), childPredicates, op.getPosition());
+      } else {
+        return new ParquetPredicates.AndPredicate(op.getName(), childPredicates, op.getPosition());
+      }
     }
+  }
 
-    @Override
-    public FilterPredicate visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
-        allExpressionsConverted = false;
-        return null;
+  private boolean containsArraySeg(final SchemaPath schemaPath) {
+    PathSegment seg = schemaPath.getRootSegment();
+
+    while (seg != null) {
+      if (seg.isArray()) {
+        return true;
+      }
+      seg = seg.getChild();
     }
+    return false;
+  }
 
-    @Override
-    public FilterPredicate visitBooleanOperator(BooleanOperator op, Void value) {
-        List<LogicalExpression> args = op.args;
-        FilterPredicate nodePredicate = null;
-        String functionName = op.getName();
-        for (LogicalExpression arg : args) {
-            switch (functionName) {
-                case "booleanAnd":
-                case "booleanOr":
-                    if (nodePredicate == null) {
-                        nodePredicate = arg.accept(this, null);
-                    } else {
-                        FilterPredicate predicate = arg.accept(this, null);
-                        if (predicate != null) {
-                            nodePredicate = mergePredicates(functionName, nodePredicate, predicate);
-                        } else {
-                            // we can't include any part of the OR if any of the predicates cannot be converted
-                            if (functionName == "booleanOr") {
-                                nodePredicate = null;
-                            }
-                            allExpressionsConverted = false;
-                        }
-                    }
-                    break;
-            }
-        }
-        return nodePredicate;
+  private LogicalExpression getValueExpressionFromConst(ValueHolder holder, TypeProtos.MinorType type) {
+    switch (type) {
+    case INT:
+      return ValueExpressions.getInt(((IntHolder) holder).value);
+    case BIGINT:
+      return ValueExpressions.getBigInt(((BigIntHolder) holder).value);
+    case FLOAT4:
+      return ValueExpressions.getFloat4(((Float4Holder) holder).value);
+    case FLOAT8:
+      return ValueExpressions.getFloat8(((Float8Holder) holder).value);
+    case DATE:
+      return ValueExpressions.getDate(((DateHolder) holder).value);
+    case TIMESTAMP:
+      return ValueExpressions.getTimeStamp(((TimeStampHolder) holder).value);
+    case TIME:
+      return ValueExpressions.getTime(((TimeHolder) holder).value);
+    default:
+      return null;
     }
+  }
 
-    private FilterPredicate mergePredicates(String functionName,
-                                            FilterPredicate leftPredicate, FilterPredicate rightPredicate) {
-        if (leftPredicate != null && rightPredicate != null) {
-            if (functionName == "booleanAnd") {
-                return FilterApi.and(leftPredicate, rightPredicate);
-            }
-            else {
-                return FilterApi.or(leftPredicate, rightPredicate);
-            }
-        } else {
-            allExpressionsConverted = false;
-            if ("booleanAnd".equals(functionName)) {
-                return leftPredicate == null ? rightPredicate : leftPredicate;
-            }
-        }
+  @Override
+  public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression funcHolderExpr, Set<LogicalExpression> value)
+      throws RuntimeException {
+    FuncHolder holder = funcHolderExpr.getHolder();
 
+    if (! (holder instanceof DrillSimpleFuncHolder)) {
+      return null;
+    }
+
+    if (value.contains(funcHolderExpr)) {
+      ValueHolder result ;
+      try {
+        result = InterpreterEvaluator.evaluateConstantExpr(udfUtilities, funcHolderExpr);
+      } catch (Exception e) {
+        logger.warn("Error in evaluating function of {}", funcHolderExpr.getName());
         return null;
+      }
+
+      logger.debug("Reduce a constant function expression into a value expression");
+      return getValueExpressionFromConst(result, funcHolderExpr.getMajorType().getMinorType());
     }
 
-    @Override
-    public FilterPredicate visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
-        FilterPredicate predicate = null;
-        String functionName = call.getName();
-        ImmutableList<LogicalExpression> args = call.args;
-
-        if (ParquetCompareFunctionProcessor.isCompareFunction(functionName)) {
-            ParquetCompareFunctionProcessor processor = ParquetCompareFunctionProcessor
-                    .process(call);
-            if (processor.isSuccess()) {
-                try {
-                    predicate = createFilterPredicate(processor.getFunctionName(),
-                            processor.getPath(), processor.getValue());
-                } catch (Exception e) {
-                    logger.error("Failed to create Parquet filter", e);
-                }
-            }
-        } else {
-            switch (functionName) {
-                case "booleanAnd":
-                case "booleanOr":
-                    FilterPredicate leftPredicate = args.get(0).accept(this, null);
-                    FilterPredicate rightPredicate = args.get(1).accept(this, null);
-                    predicate = mergePredicates(functionName, leftPredicate, rightPredicate);
-                    break;
-            }
-        }
+    final String funcName = ((DrillSimpleFuncHolder) holder).getRegisteredNames()[0];
+
+    if (isCompareFunction(funcName)) {
+      return handleCompareFunction(funcHolderExpr, value);
+    }
 
-        if (predicate == null) {
-            allExpressionsConverted = false;
+    if (CastFunctions.isCastFunction(funcName)) {
+      List<LogicalExpression> newArgs = new ArrayList();
+      for (LogicalExpression arg : funcHolderExpr.args) {
+        final LogicalExpression newArg = arg.accept(this, value);
+        if (newArg == null) {
+          return null;
         }
+        newArgs.add(newArg);
+      }
 
-        return predicate;
+      return funcHolderExpr.copy(newArgs);
+    } else {
+      return null;
     }
+  }
 
-    private FilterPredicate createFilterPredicate(String functionName,
-                                                  SchemaPath field, Object fieldValue) {
-        FilterPredicate filter = null;
-
-        // extract the field name
-        String fieldName = field.getAsUnescapedPath();
-        switch (functionName) {
-            case "equal":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.eq(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.eq(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.eq(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.eq(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                else if (fieldValue instanceof Boolean) {
-                    filter = FilterApi.eq(FilterApi.booleanColumn(fieldName), (Boolean) fieldValue);
-                }
-                break;
-            case "not_equal":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.notEq(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.notEq(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.notEq(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.notEq(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                else if (fieldValue instanceof Boolean) {
-                    filter = FilterApi.notEq(FilterApi.booleanColumn(fieldName), (Boolean) fieldValue);
-                }
-                break;
-            case "greater_than_or_equal_to":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.gtEq(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.gtEq(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.gtEq(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.gtEq(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                break;
-            case "greater_than":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.gt(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.gt(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.gt(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.gt(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                break;
-            case "less_than_or_equal_to":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.ltEq(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.ltEq(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.ltEq(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.ltEq(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                break;
-            case "less_than":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.lt(FilterApi.longColumn(fieldName), (Long) fieldValue);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.lt(FilterApi.intColumn(fieldName), (Integer) fieldValue);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.lt(FilterApi.floatColumn(fieldName), (Float) fieldValue);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.lt(FilterApi.doubleColumn(fieldName), (Double) fieldValue);
-                }
-                break;
-            case "isnull":
-            case "isNull":
-            case "is null":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.eq(FilterApi.longColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.eq(FilterApi.intColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.eq(FilterApi.floatColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.eq(FilterApi.doubleColumn(fieldName), null);
-                }
-                break;
-            case "isnotnull":
-            case "isNotNull":
-            case "is not null":
-                if (fieldValue instanceof Long) {
-                    filter = FilterApi.notEq(FilterApi.longColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Integer) {
-                    filter = FilterApi.notEq(FilterApi.intColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Float) {
-                    filter = FilterApi.notEq(FilterApi.floatColumn(fieldName), null);
-                }
-                else if (fieldValue instanceof Double) {
-                    filter = FilterApi.notEq(FilterApi.doubleColumn(fieldName), null);
-                }
-                break;
-        }
+  private LogicalExpression handleCompareFunction(FunctionHolderExpression functionHolderExpression, Set<LogicalExpression> value) {
+    List<LogicalExpression> newArgs = new ArrayList();
+
+    for (LogicalExpression arg : functionHolderExpression.args) {
+      LogicalExpression newArg = arg.accept(this, value);
+      if (newArg == null) {
+        return null;
+      }
+      newArgs.add(newArg);
+    }
+
+    String funcName = ((DrillSimpleFuncHolder) functionHolderExpression.getHolder()).getRegisteredNames()[0];
 
-        return filter;
+    switch (funcName) {
+    case FunctionGenerationHelper.EQ :
+      return new ParquetPredicates.EqualPredicate(newArgs.get(0), newArgs.get(1));
+    case FunctionGenerationHelper.GT :
+      return new ParquetPredicates.GTPredicate(newArgs.get(0), newArgs.get(1));
+    case FunctionGenerationHelper.GE :
+      return new ParquetPredicates.GEPredicate(newArgs.get(0), newArgs.get(1));
+    case FunctionGenerationHelper.LT :
+      return new ParquetPredicates.LTPredicate(newArgs.get(0), newArgs.get(1));
+    case FunctionGenerationHelper.LE :
+      return new ParquetPredicates.LEPredicate(newArgs.get(0), newArgs.get(1));
+    case FunctionGenerationHelper.NE :
+      return new ParquetPredicates.NEPredicate(newArgs.get(0), newArgs.get(1));
+    default:
+      return null;
     }
+  }
+
+  private LogicalExpression handleCastFunction(FunctionHolderExpression functionHolderExpression, Set<LogicalExpression> value) {
+    for (LogicalExpression arg : functionHolderExpression.args) {
+      LogicalExpression newArg = arg.accept(this, value);
+      if (newArg == null) {
+        return null;
+      }
+    }
+
+    String funcName = ((DrillSimpleFuncHolder) functionHolderExpression.getHolder()).getRegisteredNames()[0];
+
+    return null;
+  }
+
+  private static boolean isCompareFunction(String funcName) {
+    return COMPARE_FUNCTIONS_SET.contains(funcName);
+  }
+
+  private static final ImmutableSet<String> COMPARE_FUNCTIONS_SET;
+
+  static {
+    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+    COMPARE_FUNCTIONS_SET = builder
+        .add(FunctionGenerationHelper.EQ)
+        .add(FunctionGenerationHelper.GT)
+        .add(FunctionGenerationHelper.GE)
+        .add(FunctionGenerationHelper.LT)
+        .add(FunctionGenerationHelper.LE)
+        .add(FunctionGenerationHelper.NE)
+        .build();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a8e55b7..71e681b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -26,14 +26,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
@@ -42,7 +54,11 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ImplicitColumnExplorer;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
@@ -56,6 +72,8 @@ import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata;
 import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
 import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
 import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -112,6 +130,13 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private List<SchemaPath> columns;
   private ListMultimap<Integer, RowGroupInfo> mappings;
   private List<RowGroupInfo> rowGroupInfos;
+  private LogicalExpression filter;
+
+  /**
+   * The parquet table metadata may have already been read
+   * from a metadata cache file earlier; we can re-use during
+   * the ParquetGroupScan and avoid extra loading time.
+   */
   private Metadata.ParquetTableMetadataBase parquetTableMetadata = null;
   private String cacheFileRoot = null;
 
@@ -133,7 +158,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       @JacksonInject StoragePluginRegistry engineRegistry, //
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot, //
-      @JsonProperty("cacheFileRoot") String cacheFileRoot //
+      @JsonProperty("cacheFileRoot") String cacheFileRoot, //
+      @JsonProperty("filter") LogicalExpression filter
   ) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName));
     this.columns = columns;
@@ -149,6 +175,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.entries = entries;
     this.selectionRoot = selectionRoot;
     this.cacheFileRoot = cacheFileRoot;
+    this.filter = filter;
 
     init(null);
   }
@@ -159,7 +186,18 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       ParquetFormatPlugin formatPlugin, //
       String selectionRoot,
       String cacheFileRoot,
-      List<SchemaPath> columns) //
+      List<SchemaPath> columns) throws IOException{
+    this(userName, selection, formatPlugin, selectionRoot, cacheFileRoot, columns, ValueExpressions.BooleanExpression.TRUE);
+  }
+
+  public ParquetGroupScan( //
+      String userName,
+      FileSelection selection, //
+      ParquetFormatPlugin formatPlugin, //
+      String selectionRoot,
+      String cacheFileRoot,
+      List<SchemaPath> columns,
+      LogicalExpression filter) //
       throws IOException {
     super(userName);
     this.formatPlugin = formatPlugin;
@@ -187,6 +225,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       }
     }
 
+    this.filter = filter;
+
     init(fileSelection.getMetaContext());
   }
 
@@ -206,11 +246,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.rowGroupInfos = that.rowGroupInfos == null ? null : Lists.newArrayList(that.rowGroupInfos);
     this.selectionRoot = that.selectionRoot;
     this.columnValueCounts = that.columnValueCounts == null ? null : new HashMap<>(that.columnValueCounts);
-    this.columnTypeMap = that.columnTypeMap == null ? null : new HashMap<>(that.columnTypeMap);
+    this.partitionColTypeMap = that.partitionColTypeMap == null ? null : new HashMap<>(that.partitionColTypeMap);
     this.partitionValueMap = that.partitionValueMap == null ? null : new HashMap<>(that.partitionValueMap);
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
     this.usedMetadataCache = that.usedMetadataCache;
     this.parquetTableMetadata = that.parquetTableMetadata;
+    this.filter = that.filter;
     this.cacheFileRoot = that.cacheFileRoot;
   }
 
@@ -260,6 +301,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return fileSet;
   }
 
+  public LogicalExpression getFilter() {
+    return this.filter;
+  }
+
+  public void setFilter(LogicalExpression filter) {
+    this.filter = filter;
+  }
+
   @Override
   public boolean hasFiles() {
     return true;
@@ -273,7 +322,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private Set<String> fileSet;
 
   @JsonIgnore
-  private Map<SchemaPath, MajorType> columnTypeMap = Maps.newHashMap();
+  // only for partition columns : value is unique for each partition
+  private Map<SchemaPath, MajorType> partitionColTypeMap = Maps.newHashMap();
 
   /**
    * When reading the very first footer, any column is a potential partition column. So for the first footer, we check
@@ -295,21 +345,21 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
     if (first) {
       if (hasSingleValue(columnMetadata)) {
-        columnTypeMap.put(schemaPath, getType(primitiveType, originalType));
+        partitionColTypeMap.put(schemaPath, getType(primitiveType, originalType));
         return true;
       } else {
         return false;
       }
     } else {
-      if (!columnTypeMap.keySet().contains(schemaPath)) {
+      if (!partitionColTypeMap.keySet().contains(schemaPath)) {
         return false;
       } else {
         if (!hasSingleValue(columnMetadata)) {
-          columnTypeMap.remove(schemaPath);
+          partitionColTypeMap.remove(schemaPath);
           return false;
         }
-        if (!getType(primitiveType, originalType).equals(columnTypeMap.get(schemaPath))) {
-          columnTypeMap.remove(schemaPath);
+        if (!getType(primitiveType, originalType).equals(partitionColTypeMap.get(schemaPath))) {
+          partitionColTypeMap.remove(schemaPath);
           return false;
         }
       }
@@ -317,7 +367,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return true;
   }
 
-  private MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
+  public static MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
     if (originalType != null) {
       switch (originalType) {
         case DECIMAL:
@@ -390,7 +440,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   }
 
   public MajorType getTypeForColumn(SchemaPath schemaPath) {
-    return columnTypeMap.get(schemaPath);
+    return partitionColTypeMap.get(schemaPath);
   }
 
   // Map from file names to maps of column name to partition value mappings
@@ -771,13 +821,13 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
             Object currentValue = column.getMaxValue();
             if (value != null) {
               if (value != currentValue) {
-                columnTypeMap.remove(schemaPath);
+                partitionColTypeMap.remove(schemaPath);
               }
             } else {
               map.put(schemaPath, currentValue);
             }
           } else {
-            columnTypeMap.remove(schemaPath);
+            partitionColTypeMap.remove(schemaPath);
           }
         }
         this.rowCount += rowGroup.getRowCount();
@@ -839,7 +889,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
     return new ParquetRowGroupScan(
-        getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot);
+        getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot, filter);
   }
 
   private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
@@ -893,12 +943,16 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
             Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString();
       cacheFileString = ", cacheFileRoot=" + str;
     }
+    final String filterStr = filter == null || filter.equals(ValueExpressions.BooleanExpression.TRUE) ? "" : ", filter=" + ExpressionStringBuilder.toString(this.filter);
+
     return "ParquetGroupScan [entries=" + entries
         + ", selectionRoot=" + selectionRoot
         + ", numFiles=" + getEntries().size()
         + ", usedMetadataFile=" + usedMetadataCache
+        + filterStr
         + cacheFileString
-        + ", columns=" + columns + "]";
+        + ", columns=" + columns
+        + "]";
   }
 
   @Override
@@ -1000,6 +1054,88 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
   @Override
   public List<SchemaPath> getPartitionColumns() {
-    return new ArrayList<>(columnTypeMap.keySet());
+    return new ArrayList<>(partitionColTypeMap.keySet());
+  }
+
+  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
+      FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
+    if (fileSet.size() == 1 ||
+        ! (parquetTableMetadata.isRowGroupPrunable()) ||
+        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
+        ) {
+      // Stop pruning for 3 cases:
+      //    -  1 single parquet file,
+      //    -  metadata does not have proper format to support row group level filter pruning,
+      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      return null;
+    }
+
+    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
+
+    final List<RowGroupMetadata> qualifiedRGs = new ArrayList<>(parquetTableMetadata.getFiles().size());
+    Set<String> qualifiedFileNames = Sets.newHashSet(); // HashSet keeps a fileName unique.
+
+    ParquetFilterPredicate filterPredicate = null;
+
+    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(optionManager, this.columns);
+      Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot);
+
+      for (RowGroupMetadata rowGroup : file.getRowGroups()) {
+        ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+            parquetTableMetadata,
+            rowGroup.getColumns(),
+            implicitColValues);
+
+        Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
+
+        if (filterPredicate == null) {
+          ErrorCollector errorCollector = new ErrorCollectorImpl();
+          LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
+              filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
+
+          if (errorCollector.hasErrors()) {
+            logger.error("{} error(s) encountered when materialize filter expression : {}",
+                errorCollector.getErrorCount(), errorCollector.toErrorString());
+            return null;
+          }
+          //    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
+
+          Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+          filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
+              materializedFilter, constantBoundaries, udfUtilities);
+
+          if (filterPredicate == null) {
+            return null;
+          }
+        }
+
+        if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) {
+          continue;
+        }
+
+        qualifiedRGs.add(rowGroup);
+        qualifiedFileNames.add(file.getPath());  // TODO : optimize when 1 file contains m row groups.
+      }
+    }
+
+    if (qualifiedFileNames.size() == fileSet.size() ) {
+      // There is no reduction of rowGroups. Return the original groupScan.
+      logger.debug("applyFilter does not have any pruning!");
+      return null;
+    } else if (qualifiedFileNames.size() == 0) {
+      logger.warn("All rowgroups have been filtered out. Add back one to get schema from scannner");
+      qualifiedFileNames.add(fileSet.iterator().next());
+    }
+
+    try {
+      FileSelection newSelection = new FileSelection(null, Lists.newArrayList(qualifiedFileNames), getSelectionRoot(), cacheFileRoot, false);
+      logger.info("applyFilter {} reduce parquet file # from {} to {}", ExpressionStringBuilder.toString(filterExpr), fileSet.size(), qualifiedFileNames.size());
+      return this.clone(newSelection);
+    } catch (IOException e) {
+      logger.warn("Could not apply filter prune due to Exception : {}", e);
+      return null;
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 10c817b..6f870f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,15 +15,18 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
@@ -35,108 +36,108 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
-import com.google.common.collect.ImmutableList;
+import java.util.concurrent.TimeUnit;
 
 public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
-    public static final StoragePluginOptimizerRule getFilterOnProject(final OptimizerRulesContext context){
-        return new ParquetPushDownFilter(
-                RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))),
-                "ParquetPushDownFilter:Filter_On_Project", context) {
-
-            @Override
-            public boolean matches(RelOptRuleCall call) {
-                if (!enabled) {
-                    return false;
-                }
-                final ScanPrel scan = call.rel(2);
-                if (scan.getGroupScan() instanceof ParquetGroupScan) {
-                    return super.matches(call);
-                }
-                return false;
-            }
-
-            @Override
-            public void onMatch(RelOptRuleCall call) {
-                final FilterPrel filterRel = call.rel(0);
-                final ProjectPrel projectRel = call.rel(1);
-                final ScanPrel scanRel = call.rel(2);
-                doOnMatch(call, filterRel, projectRel, scanRel);
-            };
-        };
-    }
 
-    public static final StoragePluginOptimizerRule getFilterOnScan(final OptimizerRulesContext context){
-        return new ParquetPushDownFilter(
-                RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
-                "ParquetPushDownFilter:Filter_On_Scan", context) {
-
-            @Override
-            public boolean matches(RelOptRuleCall call) {
-                if (!enabled) {
-                    return false;
-                }
-                final ScanPrel scan = call.rel(1);
-                if (scan.getGroupScan() instanceof ParquetGroupScan) {
-                    return super.matches(call);
-                }
-                return false;
-            }
-
-            @Override
-            public void onMatch(RelOptRuleCall call) {
-                final FilterPrel filterRel = call.rel(0);
-                final ScanPrel scanRel = call.rel(1);
-                doOnMatch(call, filterRel, null, scanRel);
-            }
-        };
-    }
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetPushDownFilter.class);
 
-    private final OptimizerRulesContext context;
-    // private final boolean useNewReader;
-    protected final boolean enabled;
+  public static RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
+    return new ParquetPushDownFilter(
+        RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))),
+        "ParquetPushDownFilter:Filter_On_Project", optimizerRulesContext) {
 
-    private ParquetPushDownFilter(RelOptRuleOperand operand, String id, OptimizerRulesContext context) {
-        super(operand, id);
-        this.context = context;
-        this.enabled = context.getPlannerSettings().isParquetFilterPushEnabled();
-        // this.useNewReader = context.getPlannerSettings()getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val;
+      @Override
+      public boolean matches(RelOptRuleCall call) {
+        final ScanPrel scan = call.rel(2);
+        if (scan.getGroupScan() instanceof ParquetGroupScan) {
+          return super.matches(call);
+        }
+        return false;
+      }
+
+      @Override
+      public void onMatch(RelOptRuleCall call) {
+        final FilterPrel filterRel = call.rel(0);
+        final ProjectPrel projectRel = call.rel(1);
+        final ScanPrel scanRel = call.rel(2);
+        doOnMatch(call, filterRel, projectRel, scanRel);
+      }
+
+    };
+  }
+
+  public static StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesContext optimizerContext) {
+    return new ParquetPushDownFilter(
+        RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+        "ParquetPushDownFilter:Filter_On_Scan", optimizerContext) {
+
+      @Override
+      public boolean matches(RelOptRuleCall call) {
+        final ScanPrel scan = call.rel(1);
+        if (scan.getGroupScan() instanceof ParquetGroupScan) {
+          return super.matches(call);
+        }
+        return false;
+      }
+
+      @Override
+      public void onMatch(RelOptRuleCall call) {
+        final FilterPrel filterRel = call.rel(0);
+        final ScanPrel scanRel = call.rel(1);
+        doOnMatch(call, filterRel, null, scanRel);
+      }
+    };
+  }
+
+  // private final boolean useNewReader;
+  protected final OptimizerRulesContext optimizerContext;
+
+  private ParquetPushDownFilter(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) {
+    super(operand, id);
+    this.optimizerContext = optimizerContext;
+  }
+
+  protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel project, ScanPrel scan) {
+    ParquetGroupScan groupScan = (ParquetGroupScan) scan.getGroupScan();
+    if (groupScan.getFilter() != null && !groupScan.getFilter().equals(ValueExpressions.BooleanExpression.TRUE)) {
+      return;
     }
 
-    protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel project, ScanPrel scan) {
-        ParquetGroupScan groupScan = (ParquetGroupScan) scan.getGroupScan();
-        if (groupScan.getFilter() != null) {
-            return;
-        }
+    RexNode condition = null;
+    if (project == null) {
+      condition = filter.getCondition();
+    } else {
+      // get the filter as if it were below the projection.
+      condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
+    }
 
-        RexNode condition = null;
-        if(project == null){
-            condition = filter.getCondition();
-        }else{
-            // get the filter as if it were below the projection.
-            condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
-        }
+    if (condition == null || condition.equals(ValueExpressions.BooleanExpression.TRUE)) {
+      return;
+    }
 
-        LogicalExpression conditionExp = DrillOptiq.toDrill(
-                new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
-        ParquetFilterBuilder parquetFilterBuilder = new ParquetFilterBuilder(groupScan,
-                conditionExp);
-        ParquetGroupScan newGroupScan = parquetFilterBuilder.parseTree();
-        if (newGroupScan == null) {
-            return; // no filter pushdown so nothing to apply.
-        }
+    LogicalExpression conditionExp = DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+
+    Stopwatch timer = Stopwatch.createStarted();
+    final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext,
+        optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
+    logger.info("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
 
-        final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
-                newGroupScan, scan.getRowType());
+    if (newGroupScan == null ) {
+      return;
+    }
 
-        RelNode inputPrel = newScanPrel;
+    final ScanPrel newScanRel = ScanPrel.create(scan, scan.getTraitSet(), newGroupScan, scan.getRowType());
 
-        if(project != null){
-            inputPrel = project.copy(project.getTraitSet(), ImmutableList.of(inputPrel));
-        }
+    RelNode inputRel = newScanRel;
 
-        // Normally we could eliminate the filter if all expressions were pushed down;
-        // however, the Parquet filter implementation is type specific (whereas Drill is not)
-        final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.of(inputPrel));
-        call.transformTo(newFilter);
+    if (project != null) {
+      inputRel = project.copy(project.getTraitSet(), ImmutableList.of(inputRel));
     }
+
+    final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.<RelNode>of(inputRel));
+
+    call.transformTo(newFilter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
new file mode 100644
index 0000000..bc4be13
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.collect.Sets;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.expr.stat.RangeExprEvaluator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatCollector;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetFooterStatCollector;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ParquetRGFilterEvaluator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRGFilterEvaluator.class);
+
+  public static boolean evalFilter(LogicalExpression expr, ParquetMetadata footer, int rowGroupIndex,
+      OptionManager options, FragmentContext fragmentContext) {
+    final HashMap<String, String> emptyMap = new HashMap<String, String>();
+    return evalFilter(expr, footer, rowGroupIndex, options, fragmentContext, emptyMap);
+  }
+
+  public static boolean evalFilter(LogicalExpression expr, ParquetMetadata footer, int rowGroupIndex,
+      OptionManager options, FragmentContext fragmentContext, Map<String, String> implicitColValues) {
+    // figure out the set of columns referenced in expression.
+    final Set<SchemaPath> schemaPathsInExpr = expr.accept(new FieldReferenceFinder(), null);
+    final ColumnStatCollector columnStatCollector = new ParquetFooterStatCollector(footer, rowGroupIndex, implicitColValues,true, options);
+
+    Map<SchemaPath, ColumnStatistics> columnStatisticsMap = columnStatCollector.collectColStat(schemaPathsInExpr);
+
+    boolean canDrop = canDrop(expr, columnStatisticsMap, footer.getBlocks().get(rowGroupIndex).getRowCount(), fragmentContext, fragmentContext.getFunctionRegistry());
+    return canDrop;
+  }
+
+
+  public static boolean canDrop(ParquetFilterPredicate parquetPredicate, Map<SchemaPath,
+      ColumnStatistics> columnStatisticsMap, long rowCount) {
+    boolean canDrop = false;
+    if (parquetPredicate != null) {
+      RangeExprEvaluator rangeExprEvaluator = new RangeExprEvaluator(columnStatisticsMap, rowCount);
+      canDrop = parquetPredicate.canDrop(rangeExprEvaluator);
+    }
+    return canDrop;
+  }
+
+
+  public static boolean canDrop(LogicalExpression expr, Map<SchemaPath, ColumnStatistics> columnStatisticsMap,
+      long rowCount, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry) {
+    ErrorCollector errorCollector = new ErrorCollectorImpl();
+    LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
+        expr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
+
+    if (errorCollector.hasErrors()) {
+      logger.error("{} error(s) encountered when materialize filter expression : {}",
+          errorCollector.getErrorCount(), errorCollector.toErrorString());
+      return false;
+    }
+
+    Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+    ParquetFilterPredicate parquetPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
+        materializedFilter, constantBoundaries, udfUtilities);
+
+    return canDrop(parquetPredicate, columnStatisticsMap, rowCount);
+  }
+
+  /**
+   * Search through a LogicalExpression, finding all internal schema path references and returning them in a set.
+   */
+  public static class FieldReferenceFinder extends AbstractExprVisitor<Set<SchemaPath>, Void, RuntimeException> {
+    @Override
+    public Set<SchemaPath> visitSchemaPath(SchemaPath path, Void value) {
+      Set<SchemaPath> set = Sets.newHashSet();
+      set.add(path);
+      return set;
+    }
+
+    @Override
+    public Set<SchemaPath> visitUnknown(LogicalExpression e, Void value) {
+      Set<SchemaPath> paths = Sets.newHashSet();
+      for (LogicalExpression ex : e) {
+        paths.addAll(ex.accept(this, null));
+      }
+      return paths;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index cffcdac..f62efb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -50,6 +51,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   private final ParquetFormatPlugin formatPlugin;
   private final List<RowGroupReadEntry> rowGroupReadEntries;
   private final List<SchemaPath> columns;
+  private LogicalExpression filter;
   private String selectionRoot;
 
   @JsonCreator
@@ -60,11 +62,12 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
       @JsonProperty("columns") List<SchemaPath> columns, //
-      @JsonProperty("selectionRoot") String selectionRoot //
+      @JsonProperty("selectionRoot") String selectionRoot, //
+      @JsonProperty("filter") LogicalExpression filter
   ) throws ExecutionSetupException {
     this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
             formatConfig == null ? new ParquetFormatConfig() : formatConfig),
-        rowGroupReadEntries, columns, selectionRoot);
+        rowGroupReadEntries, columns, selectionRoot, filter);
   }
 
   public ParquetRowGroupScan( //
@@ -72,7 +75,8 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
       ParquetFormatPlugin formatPlugin, //
       List<RowGroupReadEntry> rowGroupReadEntries, //
       List<SchemaPath> columns, //
-      String selectionRoot //
+      String selectionRoot, //
+      LogicalExpression filter
   ) {
     super(userName);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
@@ -80,6 +84,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
+    this.filter = filter;
   }
 
   @JsonProperty("entries")
@@ -114,7 +119,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter);
   }
 
   @Override
@@ -126,6 +131,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return columns;
   }
 
+  public LogicalExpression getFilter() {
+    return filter;
+  }
+
   @Override
   public int getOperatorType() {
     return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index a14bab5..fa7f44e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -67,7 +67,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
 
     if (!columnExplorer.isStarQuery()) {
       rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
-          rowGroupScan.getRowGroupReadEntries(), columnExplorer.getTableColumns(), rowGroupScan.getSelectionRoot());
+          rowGroupScan.getRowGroupReadEntries(), columnExplorer.getTableColumns(), rowGroupScan.getSelectionRoot(),
+          rowGroupScan.getFilter());
       rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 57c0a66..be27f3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -32,14 +32,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 public class ParquetToDrillTypeConverter {
 
-  private static TypeProtos.MinorType getDecimalType(SchemaElement schemaElement) {
-    return schemaElement.getPrecision() <= 28 ? TypeProtos.MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
+
+  private static TypeProtos.MinorType getDecimalType(int precision) {
+    return precision <= 28 ? TypeProtos.MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
   }
 
   private static TypeProtos.MinorType getMinorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
-                                                   SchemaElement schemaElement, OptionManager options) {
+                                                   ConvertedType convertedType, int precision, int scale,
+      OptionManager options) {
 
-    ConvertedType convertedType = schemaElement.getConverted_type();
 
     switch (primitiveTypeName) {
       case BINARY:
@@ -51,7 +52,7 @@ public class ParquetToDrillTypeConverter {
             return (TypeProtos.MinorType.VARCHAR);
           case DECIMAL:
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
-            return (getDecimalType(schemaElement));
+            return (getDecimalType(precision));
           default:
             return (TypeProtos.MinorType.VARBINARY);
         }
@@ -106,7 +107,7 @@ public class ParquetToDrillTypeConverter {
           return TypeProtos.MinorType.VARBINARY;
         } else if (convertedType == ConvertedType.DECIMAL) {
           ParquetReaderUtility.checkDecimalTypeEnabled(options);
-          return getDecimalType(schemaElement);
+          return getDecimalType(precision);
         } else if (convertedType == ConvertedType.INTERVAL) {
           return TypeProtos.MinorType.INTERVAL;
         }
@@ -118,12 +119,20 @@ public class ParquetToDrillTypeConverter {
   public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
                                           TypeProtos.DataMode mode, SchemaElement schemaElement,
                                           OptionManager options) {
-    MinorType minorType = getMinorType(primitiveTypeName, length, schemaElement, options);
+    return toMajorType(primitiveTypeName, length, mode, schemaElement.getConverted_type(),
+        schemaElement.getPrecision(), schemaElement.getScale(), options);
+  }
+
+  public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
+      TypeProtos.DataMode mode, ConvertedType convertedType, int precision, int scale,
+      OptionManager options) {
+    MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, scale, options);
     TypeProtos.MajorType.Builder typeBuilder = TypeProtos.MajorType.newBuilder().setMinorType(minorType).setMode(mode);
 
     if (CoreDecimalUtility.isDecimalType(minorType)) {
-      typeBuilder.setPrecision(schemaElement.getPrecision()).setScale(schemaElement.getScale());
+      typeBuilder.setPrecision(precision).setScale(scale);
     }
     return typeBuilder.build();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatCollector.java
new file mode 100644
index 0000000..8f93c8a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatCollector.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.stat;
+
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ColumnStatCollector {
+  /**
+   * Given a list of fields (SchemaPath), return mapping from field to its corresponding ColumnStatistics
+   * @return
+   */
+  Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatistics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatistics.java
new file mode 100644
index 0000000..7bad491
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ColumnStatistics.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.stat;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.parquet.column.statistics.Statistics;
+
+public class ColumnStatistics {
+  private final Statistics statistics;
+  private final TypeProtos.MajorType majorType;
+
+  public ColumnStatistics(final Statistics statistics, final TypeProtos.MajorType majorType) {
+    this.statistics = statistics;
+    this.majorType = majorType;
+  }
+
+  public Statistics getStatistics() {
+    return this.statistics;
+  }
+
+  public TypeProtos.MajorType getMajorType() {
+    return this.majorType;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
new file mode 100644
index 0000000..6294655
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.stat;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetToDrillTypeConverter;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.joda.time.DateTimeConstants;
+import org.joda.time.DateTimeUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ParquetFooterStatCollector implements ColumnStatCollector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetFooterStatCollector.class);
+
+  private final ParquetMetadata footer;
+  private final int rowGroupIndex;
+  private final OptionManager options;
+  private final Map<String, String> implicitColValues;
+  private final boolean autoCorrectCorruptDates;
+
+  public ParquetFooterStatCollector(ParquetMetadata footer, int rowGroupIndex, Map<String, String> implicitColValues,
+      boolean autoCorrectCorruptDates, OptionManager options) {
+    this.footer = footer;
+    this.rowGroupIndex = rowGroupIndex;
+
+    // Reasons to pass implicit columns and their values:
+    // 1. Differentiate implicit columns from regular non-exist columns. Implicit columns do not
+    //    exist in parquet metadata. Without such knowledge, implicit columns is treated as non-exist
+    //    column.  A condition on non-exist column would lead to canDrop = true, which is not the
+    //    right behavior for condition on implicit columns.
+
+    // 2. Pass in the implicit column name with corresponding values, and wrap them in Statistics with
+    //    min and max having same value. This expands the possibility of pruning.
+    //    For example, regCol = 5 or dir0 = 1995. If regCol is not a partition column, we would not do
+    //    any partition pruning in the current partition pruning logical. Pass the implicit column values
+    //    may allow us to prune some row groups using condition regCol = 5 or dir0 = 1995.
+
+    this.implicitColValues = implicitColValues;
+    this.autoCorrectCorruptDates = autoCorrectCorruptDates;
+    this.options = options;
+  }
+
+  @Override
+  public Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields) {
+    Stopwatch timer = Stopwatch.createStarted();
+
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
+        ParquetReaderUtility.detectCorruptDates(footer, new ArrayList<>(fields), autoCorrectCorruptDates);
+
+    // map from column name to ColumnDescriptor
+    Map<SchemaPath, ColumnDescriptor> columnDescMap = new HashMap<>();
+
+    // map from column name to ColumnChunkMetaData
+    final Map<SchemaPath, ColumnChunkMetaData> columnChkMetaMap = new HashMap<>();
+
+    // map from column name to MajorType
+    final Map<SchemaPath, TypeProtos.MajorType> columnTypeMap = new HashMap<>();
+
+    // map from column name to SchemaElement
+    final Map<SchemaPath, SchemaElement> schemaElementMap = new HashMap<>();
+
+    // map from column name to column statistics.
+    final Map<SchemaPath, ColumnStatistics> statMap = new HashMap<>();
+
+    final org.apache.parquet.format.FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+
+    for (final ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
+      final SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getPath());
+      if (fields.contains(schemaPath)) {
+        columnDescMap.put(schemaPath, column);
+      }
+    }
+
+    for (final SchemaElement se : fileMetaData.getSchema()) {
+      final SchemaPath schemaPath = SchemaPath.getSimplePath(se.getName());
+      if (fields.contains(schemaPath)) {
+        schemaElementMap.put(schemaPath, se);
+      }
+    }
+
+    for (final ColumnChunkMetaData colMetaData: footer.getBlocks().get(rowGroupIndex).getColumns()) {
+      final SchemaPath schemaPath = SchemaPath.getCompoundPath(colMetaData.getPath().toArray());
+      if (fields.contains(schemaPath)) {
+        columnChkMetaMap.put(schemaPath, colMetaData);
+      }
+    }
+
+    for (final SchemaPath path : fields) {
+      if (columnDescMap.containsKey(path) && schemaElementMap.containsKey(path) && columnChkMetaMap.containsKey(path)) {
+        ColumnDescriptor columnDesc =  columnDescMap.get(path);
+        SchemaElement se = schemaElementMap.get(path);
+        ColumnChunkMetaData metaData = columnChkMetaMap.get(path);
+
+        TypeProtos.MajorType type = ParquetToDrillTypeConverter.toMajorType(columnDesc.getType(), se.getType_length(),
+            getDataMode(columnDesc), se, options);
+
+        columnTypeMap.put(path, type);
+
+        Statistics stat = metaData.getStatistics();
+        if (type.getMinorType() == TypeProtos.MinorType.DATE) {
+          stat = convertDateStatIfNecessary(metaData.getStatistics(), containsCorruptDates);
+        }
+
+        statMap.put(path, new ColumnStatistics(stat, type));
+      } else {
+        final String columnName = path.getRootSegment().getPath();
+        if (implicitColValues.containsKey(columnName)) {
+          TypeProtos.MajorType type = Types.required(TypeProtos.MinorType.VARCHAR);
+          Statistics stat = new BinaryStatistics();
+          stat.setNumNulls(0);
+          byte[] val = implicitColValues.get(columnName).getBytes();
+          stat.setMinMaxFromBytes(val, val);
+          statMap.put(path, new ColumnStatistics(stat, type));
+        }
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Took {} ms to column statistics for row group", timer.elapsed(TimeUnit.MILLISECONDS));
+    }
+
+    return statMap;
+  }
+
+  private static TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+    if (column.getMaxRepetitionLevel() > 0 ) {
+      return TypeProtos.DataMode.REPEATED;
+    } else if (column.getMaxDefinitionLevel() == 0) {
+      return TypeProtos.DataMode.REQUIRED;
+    } else {
+      return TypeProtos.DataMode.OPTIONAL;
+    }
+  }
+
+  public static Statistics convertDateStatIfNecessary(Statistics stat,
+      ParquetReaderUtility.DateCorruptionStatus containsCorruptDates) {
+    IntStatistics dateStat = (IntStatistics) stat;
+    LongStatistics dateMLS = new LongStatistics();
+
+    boolean isDateCorrect = containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+
+    // Only do conversion when stat is NOT empty.
+    if (!dateStat.isEmpty()) {
+        dateMLS.setMinMax(
+            convertToDrillDateValue(dateStat.getMin(), isDateCorrect),
+            convertToDrillDateValue(dateStat.getMax(), isDateCorrect));
+        dateMLS.setNumNulls(dateStat.getNumNulls());
+    }
+
+    return dateMLS;
+
+  }
+
+  private static long convertToDrillDateValue(int dateValue, boolean isDateCorrect) {
+    // See DRILL-4203 for the background regarding date type corruption issue in Drill CTAS prior to 1.9.0 release.
+    if (isDateCorrect) {
+      return dateValue * (long) DateTimeConstants.MILLIS_PER_DAY;
+    } else {
+      return (dateValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
new file mode 100644
index 0000000..3fe10c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.stat;
+
+import com.google.common.base.Stopwatch;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.parquet.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.FloatStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.joda.time.DateTimeConstants;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ParquetMetaStatCollector implements  ColumnStatCollector{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetMetaStatCollector.class);
+
+  private  final Metadata.ParquetTableMetadataBase parquetTableMetadata;
+  private  final List<? extends Metadata.ColumnMetadata> columnMetadataList;
+  final Map<String, String> implicitColValues;
+
+  public ParquetMetaStatCollector(Metadata.ParquetTableMetadataBase parquetTableMetadata,
+      List<? extends Metadata.ColumnMetadata> columnMetadataList, Map<String, String> implicitColValues) {
+    this.parquetTableMetadata = parquetTableMetadata;
+    this.columnMetadataList = columnMetadataList;
+
+    // Reasons to pass implicit columns and their values:
+    // 1. Differentiate implicit columns from regular non-exist columns. Implicit columns do not
+    //    exist in parquet metadata. Without such knowledge, implicit columns is treated as non-exist
+    //    column.  A condition on non-exist column would lead to canDrop = true, which is not the
+    //    right behavior for condition on implicit columns.
+
+    // 2. Pass in the implicit column name with corresponding values, and wrap them in Statistics with
+    //    min and max having same value. This expands the possibility of pruning.
+    //    For example, regCol = 5 or dir0 = 1995. If regCol is not a partition column, we would not do
+    //    any partition pruning in the current partition pruning logical. Pass the implicit column values
+    //    may allow us to prune some row groups using condition regCol = 5 or dir0 = 1995.
+
+    this.implicitColValues = implicitColValues;
+  }
+
+  @Override
+  public Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields) {
+    Stopwatch timer = Stopwatch.createStarted();
+
+    // map from column to ColumnMetadata
+    final Map<SchemaPath, Metadata.ColumnMetadata> columnMetadataMap = new HashMap<>();
+
+    // map from column name to column statistics.
+    final Map<SchemaPath, ColumnStatistics> statMap = new HashMap<>();
+
+    for (final Metadata.ColumnMetadata columnMetadata : columnMetadataList) {
+      SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName());
+      columnMetadataMap.put(schemaPath, columnMetadata);
+    }
+
+    for (final SchemaPath schemaPath : fields) {
+      final PrimitiveType.PrimitiveTypeName primitiveType;
+      final OriginalType originalType;
+
+      final Metadata.ColumnMetadata columnMetadata = columnMetadataMap.get(schemaPath);
+
+      if (columnMetadata != null) {
+        final Object min = columnMetadata.getMinValue();
+        final Object max = columnMetadata.getMaxValue();
+        final Long numNull = columnMetadata.getNulls();
+
+        primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
+        originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
+        final Integer repetitionLevel = this.parquetTableMetadata.getRepetitionLevel(columnMetadata.getName());
+
+        statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, originalType, repetitionLevel));
+      } else {
+        final String columnName = schemaPath.getRootSegment().getPath();
+        if (implicitColValues.containsKey(columnName)) {
+          TypeProtos.MajorType type = Types.required(TypeProtos.MinorType.VARCHAR);
+          Statistics stat = new BinaryStatistics();
+          stat.setNumNulls(0);
+          byte[] val = implicitColValues.get(columnName).getBytes();
+          stat.setMinMaxFromBytes(val, val);
+          statMap.put(schemaPath, new ColumnStatistics(stat, type));
+        }
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Took {} ms to column statistics for row group", timer.elapsed(TimeUnit.MILLISECONDS));
+    }
+
+    return statMap;
+  }
+
+  private ColumnStatistics getStat(Object min, Object max, Long numNull,
+      PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType, Integer repetitionLevel) {
+    Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
+    Statistics convertedStat = stat;
+
+    TypeProtos.MajorType type = ParquetGroupScan.getType(primitiveType, originalType);
+
+    // Change to repeated if repetitionLevel > 0
+    if (repetitionLevel != null && repetitionLevel > 0) {
+      type = TypeProtos.MajorType.newBuilder().setMinorType(type.getMinorType()).setMode(TypeProtos.DataMode.REPEATED).build();
+    }
+
+    if (numNull != null) {
+      stat.setNumNulls(numNull.longValue());
+    }
+
+    if (min != null && max != null ) {
+      switch (type.getMinorType()) {
+      case INT :
+      case TIME:
+        ((IntStatistics) stat).setMinMax(((Integer) min).intValue(), ((Integer) max).intValue());
+        break;
+      case BIGINT:
+      case TIMESTAMP:
+        ((LongStatistics) stat).setMinMax(((Long) min).longValue(), ((Long) max).longValue());
+        break;
+      case FLOAT4:
+        ((FloatStatistics) stat).setMinMax(((Float) min).floatValue(), ((Float) max).floatValue());
+        break;
+      case FLOAT8:
+        ((DoubleStatistics) stat).setMinMax(((Double) min).doubleValue(), ((Double) max).doubleValue());
+        break;
+      case DATE:
+        convertedStat = new LongStatistics();
+        convertedStat.setNumNulls(stat.getNumNulls());
+        final long minMS = convertToDrillDateValue(((Integer) min).intValue());
+        final long maxMS = convertToDrillDateValue(((Integer) max).intValue());
+        ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
+        break;
+      default:
+      }
+    }
+
+    return new ColumnStatistics(convertedStat, type);
+  }
+
+  private static long convertToDrillDateValue(int dateValue) {
+      return dateValue * (long) DateTimeConstants.MILLIS_PER_DAY;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index b1d833b..7d029ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -202,7 +202,8 @@ public class TestPartitionFilter extends PlanTestBase {
   @Test // Parquet: one side of OR has partition filter only, other side has both partition filter and non-partition filter
   public void testPartitionFilter6_Parquet_from_CTAS() throws Exception {
     String query = String.format("select * from dfs_test.tmp.parquet where (yr=1995 and o_totalprice < 40000) or yr=1996", TEST_RES_PATH);
-    testIncludeFilter(query, 8, "Filter", 46);
+    // Parquet RG filter pushdown further reduces to 6 files.
+    testIncludeFilter(query, 6, "Filter", 46);
   }
 
   @Test // Parquet: trivial case with 1 partition filter
@@ -232,13 +233,15 @@ public class TestPartitionFilter extends PlanTestBase {
   @Test // Parquet: partition filter on subdirectory only plus non-partition filter
   public void testPartitionFilter9_Parquet() throws Exception {
     String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where dir1 in ('Q1','Q4') and o_totalprice < 40000", TEST_RES_PATH);
-    testIncludeFilter(query, 6, "Filter", 9);
+    // Parquet RG filter pushdown further reduces to 4 files.
+    testIncludeFilter(query, 4, "Filter", 9);
   }
 
   @Test
   public void testPartitionFilter9_Parquet_from_CTAS() throws Exception {
     String query = String.format("select * from dfs_test.tmp.parquet where qrtr in ('Q1','Q4') and o_totalprice < 40000", TEST_RES_PATH);
-    testIncludeFilter(query, 6, "Filter", 9);
+    // Parquet RG filter pushdown further reduces to 4 files.
+    testIncludeFilter(query, 4, "Filter", 9);
   }
 
   @Test
@@ -272,7 +275,8 @@ public class TestPartitionFilter extends PlanTestBase {
   public void testMainQueryFilterRegularColumn() throws Exception {
     String root = FileUtils.getResourceAsFile("/multilevel/parquet").toURI().toString();
     String query =  String.format("select * from (select dir0, o_custkey from dfs_test.`%s` where dir0='1994' and o_custkey = 10) t limit 0", root);
-    testIncludeFilter(query, 4, "Filter", 0);
+    // with Parquet RG filter pushdown, reduce to 1 file ( o_custkey all > 10).
+    testIncludeFilter(query, 1, "Filter", 0);
   }
 
   @Test // see DRILL-2852 and DRILL-3591


Mime
View raw message