drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject drill git commit: DRILL-3333: Parquet writer auto-partitioning and partition pruning
Date Wed, 24 Jun 2015 08:30:33 GMT
Repository: drill
Updated Branches:
  refs/heads/master 3aa82bc92 -> 5a34d8194


DRILL-3333: Parquet writer auto-partitioning and partition pruning


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5a34d819
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5a34d819
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5a34d819

Branch: refs/heads/master
Commit: 5a34d8194a660f82391e1143f445a7a890340e34
Parents: 3aa82bc
Author: Steven Phillips <smp@apache.org>
Authored: Tue Jun 23 18:41:12 2015 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Wed Jun 24 00:48:01 2015 -0700

----------------------------------------------------------------------
 .../codegen/templates/AbstractRecordWriter.java |  18 +
 .../templates/EventBasedRecordWriter.java       |   8 +
 .../codegen/templates/NewValueFunctions.java    | 103 +++
 .../main/codegen/templates/RecordWriter.java    |  10 +-
 .../templates/StringOutputRecordWriter.java     |  11 +-
 .../exec/physical/base/AbstractGroupScan.java   |   6 +
 .../drill/exec/physical/base/GroupScan.java     |   7 +
 .../exec/physical/impl/WriterRecordBatch.java   |   2 +-
 .../planner/ParquetPartitionDescriptor.java     |  62 ++
 .../exec/planner/logical/DrillRuleSets.java     |   2 +
 .../logical/partition/PruneScanRule.java        | 713 ++++++++++++-------
 .../sql/handlers/CreateTableHandler.java        |   4 +-
 .../drill/exec/store/NewValueFunction.java      | 209 ++++++
 .../exec/store/easy/json/JsonRecordWriter.java  |   3 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 310 +++++++-
 .../exec/store/parquet/ParquetRecordWriter.java |  42 +-
 .../exec/store/text/DrillTextRecordWriter.java  |   1 +
 17 files changed, 1237 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6b6065f..5f1f42f 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+import java.lang.UnsupportedOperationException;
+
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="org/apache/drill/exec/store/AbstractRecordWriter.java" />
 <#include "/@includes/license.ftl" />
@@ -24,6 +26,8 @@ package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.BitVector.Accessor;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
@@ -31,6 +35,20 @@ import java.lang.UnsupportedOperationException;
 
 public abstract class AbstractRecordWriter implements RecordWriter {
 
+  private Accessor newPartitionVector;
+
+  protected void setPartitionVector(BitVector newPartitionVector) {
+    this.newPartitionVector = newPartitionVector.getAccessor();
+  }
+
+  protected boolean newPartition(int index) {
+    return newPartitionVector.get(index) == 1;
+  }
+
+  public void checkForNewPartition(int index) {
+    // no op
+  }
+
   @Override
   public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
     throw new UnsupportedOperationException("Doesn't support writing Map'");

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index 797f3cb..cf1529d 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+import org.apache.drill.exec.planner.physical.WriterPrel;
+
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="org/apache/drill/exec/store/EventBasedRecordWriter.java" />
 <#include "/@includes/license.ftl" />
@@ -25,6 +27,8 @@ package org.apache.drill.exec.store;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -54,6 +58,7 @@ public class EventBasedRecordWriter {
     int counter = 0;
 
     for (; counter < recordCount; counter++) {
+      recordWriter.checkForNewPartition(counter);
       recordWriter.startRecord();
       // write the current record
       for (FieldConverter converter : fieldConverters) {
@@ -73,6 +78,9 @@ public class EventBasedRecordWriter {
     try {
       int fieldId = 0;
       for (VectorWrapper w : batch) {
+        if (w.getField().getPath().equals(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD))) {
+          continue;
+        }
         FieldReader reader = w.getValueVector().getReader();
         FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader);
         fieldConverters.add(converter);

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
new file mode 100644
index 0000000..b8ba4cc
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/GNewValueFunctions.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+import javax.inject.Inject;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class GNewValueFunctions {
+<#list vv.types as type>
+<#if type.major == "Fixed">
+
+<#list type.minor as minor>
+<#list vv.modes as mode>
+  <#if mode.name != "Repeated">
+
+<#if !minor.class.startsWith("Decimal28") && !minor.class.startsWith("Decimal38") && !minor.class.startsWith("Interval")>
+@SuppressWarnings("unused")
+@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc{
+
+  @Param ${mode.prefix}${minor.class}Holder in;
+  @Workspace ${mode.prefix}${minor.class}Holder previous;
+  @Workspace Boolean initialized;
+  @Output BitHolder out;
+
+  public void setup() {
+    initialized = false;
+  }
+
+  <#if mode.name == "Required">
+  public void eval() {
+    if (initialized) {
+      if (in.value == previous.value) {
+        out.value = 0;
+      } else {
+        previous.value = in.value;
+        out.value = 1;
+      }
+    } else {
+      previous.value = in.value;
+      out.value = 1;
+      initialized = true;
+    }
+  }
+  </#if>
+  <#if mode.name == "Optional">
+  public void eval() {
+    if (initialized) {
+      if (in.isSet == 0 && previous.isSet == 0) {
+        out.value = 0;
+      } else if (in.value == previous.value) {
+        out.value = 0;
+      } else {
+        previous.value = in.value;
+        previous.isSet = in.isSet;
+        out.value = 1;
+      }
+    } else {
+      previous.value = in.value;
+      previous.isSet = in.isSet;
+      out.value = 1;
+      initialized = true;
+    }
+  }
+  </#if>
+}
+</#if> <#-- minor.class.startWith -->
+
+</#if> <#-- mode.name -->
+</#list>
+</#list>
+</#if> <#-- type.major -->
+</#list>
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/RecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index c6325fd..a37ffa8 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,7 @@ package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
@@ -43,10 +44,15 @@ public interface RecordWriter {
 
   /**
    * Update the schema in RecordWriter. Called at least once before starting writing the records.
-   * @param schema
+   * @param batch
    * @throws IOException
    */
-  void updateSchema(BatchSchema schema) throws IOException;
+  void updateSchema(VectorAccessible batch) throws IOException;
+
+  /**
+   * Check if the writer should start a new partition, and if so, start a new partition
+   */
+  public void checkForNewPartition(int index);
 
   /**
    * Called before starting writing fields in a record.

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index f704cca..c175900 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+import org.apache.drill.exec.store.AbstractRecordWriter;
+
 import java.lang.Override;
 import java.lang.UnsupportedOperationException;
 
@@ -31,6 +33,7 @@ import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.vector.*;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -48,14 +51,16 @@ import java.util.Map;
  *
  * This is useful for text format writers such as CSV, TSV etc.
  */
-public abstract class StringOutputRecordWriter implements RecordWriter {
+public abstract class StringOutputRecordWriter extends AbstractRecordWriter {
 
   private final BufferAllocator allocator;
   protected StringOutputRecordWriter(BufferAllocator allocator){
     this.allocator = allocator;
   }
-  
-  public void updateSchema(BatchSchema schema) throws IOException {
+
+  @Override
+  public void updateSchema(VectorAccessible batch) throws IOException {
+    BatchSchema schema = batch.getSchema();
     List<String> columnNames = Lists.newArrayList();
     for (int i=0; i < schema.getFieldCount(); i++) {
       columnNames.add(schema.getColumn(i).getLastName());

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 5c4ee4d..1277ec4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -122,4 +123,9 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
   public int getOperatorType() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public List<SchemaPath> getPartitionColumns() {
+    return Lists.newArrayList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 2d16cd0..946c7e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -91,4 +91,11 @@ public interface GroupScan extends Scan, HasAffinity{
    */
   public boolean supportsPartitionFilterPushdown();
 
+  /**
+   * Returns a list of columns that can be used for partition pruning
+   *
+   */
+  @JsonIgnore
+  public List<SchemaPath> getPartitionColumns();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index d5d64a7..5fe7667 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -150,7 +150,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     try {
       // update the schema in RecordWriter
       stats.startSetup();
-      recordWriter.updateSchema(incoming.getSchema());
+      recordWriter.updateSchema(incoming);
       // Create two vectors for:
       //   1. Fragment unique id.
       //   2. Summary: currently contains number of records written.

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
new file mode 100644
index 0000000..127e70a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * PartitionDescriptor that describes partitions based on column names instead of directory structure
+ */
+public class ParquetPartitionDescriptor implements PartitionDescriptor {
+
+  private final List<SchemaPath> partitionColumns;
+
+  public ParquetPartitionDescriptor(List<SchemaPath> partitionColumns) {
+    this.partitionColumns = partitionColumns;
+  }
+
+  @Override
+  public int getPartitionHierarchyIndex(String partitionName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isPartitionName(String name) {
+    return partitionColumns.contains(name);
+  }
+
+  @Override
+  public Integer getIdIfValid(String name) {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(name);
+    int id = partitionColumns.indexOf(schemaPath);
+    if (id == -1) {
+      return null;
+    }
+    return id;
+  }
+
+  @Override
+  public int getMaxHierarchyLevel() {
+    return partitionColumns.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index d9b1354..daa7276 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -155,6 +155,8 @@ public class DrillRuleSets {
 
       PruneScanRule.getFilterOnProject(context),
       PruneScanRule.getFilterOnScan(context),
+      PruneScanRule.getFilterOnProjectParquet(context),
+      PruneScanRule.getFilterOnScanParquet(context),
 
       /*
        Convert from Calcite Logical to Drill Logical Rules.

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 2544d34..c8c7db6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -1,84 +1,214 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 package org.apache.drill.exec.planner.logical.partition;
 
+import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.util.BitSets;
-
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.FileGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
-import org.apache.drill.exec.planner.logical.DrillFilterRel;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.store.dfs.FileSelection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+
+ import org.apache.calcite.rex.RexUtil;
+ import org.apache.calcite.util.BitSets;
+
+ import org.apache.drill.common.expression.ErrorCollectorImpl;
+ import org.apache.drill.common.expression.LogicalExpression;
+ import org.apache.drill.common.expression.SchemaPath;
+ import org.apache.drill.common.types.TypeProtos.MajorType;
+ import org.apache.drill.common.types.TypeProtos.MinorType;
+ import org.apache.drill.common.types.Types;
+ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+ import org.apache.drill.exec.expr.TypeHelper;
+ import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
+ import org.apache.drill.exec.memory.BufferAllocator;
+ import org.apache.drill.exec.ops.QueryContext;
+ import org.apache.drill.exec.physical.base.FileGroupScan;
+ import org.apache.drill.exec.physical.base.GroupScan;
+ import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
+import org.apache.drill.exec.planner.ParquetPartitionDescriptor;
+import org.apache.drill.exec.planner.PartitionDescriptor;
+ import org.apache.drill.exec.planner.logical.DrillFilterRel;
+ import org.apache.drill.exec.planner.logical.DrillOptiq;
+ import org.apache.drill.exec.planner.logical.DrillParseContext;
+ import org.apache.drill.exec.planner.logical.DrillProjectRel;
+ import org.apache.drill.exec.planner.logical.DrillRel;
+ import org.apache.drill.exec.planner.logical.DrillScanRel;
+ import org.apache.drill.exec.planner.logical.RelOptHelper;
+ import org.apache.drill.exec.planner.physical.PlannerSettings;
+ import org.apache.drill.exec.planner.physical.PrelUtil;
+ import org.apache.drill.exec.record.MaterializedField;
+ import org.apache.drill.exec.record.VectorContainer;
+ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+ import org.apache.drill.exec.vector.NullableVarCharVector;
+ import org.apache.calcite.rel.RelNode;
+ import org.apache.calcite.plan.RelOptRule;
+ import org.apache.calcite.plan.RelOptRuleCall;
+ import org.apache.calcite.plan.RelOptRuleOperand;
+ import org.apache.calcite.plan.RelOptUtil;
+ import org.apache.calcite.rex.RexNode;
+
+ import com.google.common.base.Charsets;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import org.apache.drill.exec.vector.ValueVector;
 
 public abstract class PruneScanRule extends RelOptRule {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
+   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
+
+   public static final RelOptRule getFilterOnProject(QueryContext context){
+       return new PruneScanRule(
+           RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+           "PruneScanRule:Filter_On_Project",
+           context) {
+
+       @Override
+         public boolean matches(RelOptRuleCall call) {
+           final DrillScanRel scan = (DrillScanRel) call.rel(2);
+           GroupScan groupScan = scan.getGroupScan();
+           // this rule is applicable only for dfs based partition pruning
+           return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+         }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+         final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+         final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1);
+         final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
+         doOnMatch(call, filterRel, projectRel, scanRel);
+       };
+
+         @Override
+         protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+           return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
+         }
+
+         @Override
+         protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+           int record = 0;
+           for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+             final PathPartition partition = iter.next();
+             for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+               if(partition.dirs[partitionColumnIndex] == null){
+                 ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record);
+               }else{
+                 byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8);
+                 ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length);
+               }
+             }
+           }
+
+           for(ValueVector v : vectors){
+             if(v == null){
+               continue;
+             }
+             v.getMutator().setValueCount(partitions.size());
+           }
+         }
+
+         @Override
+         protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+           return Types.optional(MinorType.VARCHAR);
+         }
+
+         @Override
+         protected List<String> getFiles(DrillScanRel scanRel) {
+           return ((FormatSelection)scanRel.getDrillTable().getSelection()).getAsFiles();
+         }
+       };
+   }
+
+   public static final RelOptRule getFilterOnScan(QueryContext context){
+     return new PruneScanRule(
+           RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+           "PruneScanRule:Filter_On_Scan", context) {
+
+       @Override
+         public boolean matches(RelOptRuleCall call) {
+           final DrillScanRel scan = (DrillScanRel) call.rel(1);
+           GroupScan groupScan = scan.getGroupScan();
+           // this rule is applicable only for dfs based partition pruning
+           return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+         }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+         final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+         final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+         doOnMatch(call, filterRel, null, scanRel);
+       }
+
+       @Override
+       protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+         return new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
+       }
+
+       @Override
+       protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+         int record = 0;
+         for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+           final PathPartition partition = iter.next();
+           for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+             if(partition.dirs[partitionColumnIndex] == null){
+               ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record);
+             }else{
+               byte[] bytes = partition.dirs[partitionColumnIndex].getBytes(Charsets.UTF_8);
+               ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length);
+             }
+           }
+         }
+
+         for(ValueVector v : vectors){
+           if(v == null){
+             continue;
+           }
+           v.getMutator().setValueCount(partitions.size());
+         }
+       }
+
+       @Override
+        protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+          return Types.optional(MinorType.VARCHAR);
+        }
+
+       @Override
+       protected List<String> getFiles(DrillScanRel scanRel) {
+         return ((FormatSelection)scanRel.getDrillTable().getSelection()).getAsFiles();
+       }
+     };
+   }
 
-  public static final RelOptRule getFilterOnProject(QueryContext context){
-      return new PruneScanRule(
-          RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
-          "PruneScanRule:Filter_On_Project",
-          context) {
+  public static final RelOptRule getFilterOnProjectParquet(QueryContext context){
+    return new PruneScanRule(
+        RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
+        "PruneScanRule:Filter_On_Project_Parquet",
+        context) {
 
       @Override
-        public boolean matches(RelOptRuleCall call) {
-          final DrillScanRel scan = (DrillScanRel) call.rel(2);
-          GroupScan groupScan = scan.getGroupScan();
-          // this rule is applicable only for dfs based partition pruning
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        }
+      public boolean matches(RelOptRuleCall call) {
+        final DrillScanRel scan = (DrillScanRel) call.rel(2);
+        GroupScan groupScan = scan.getGroupScan();
+        // this rule is applicable only for dfs based partition pruning
+        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
+      }
 
       @Override
       public void onMatch(RelOptRuleCall call) {
@@ -87,222 +217,291 @@ public abstract class PruneScanRule extends RelOptRule {
         final DrillScanRel scanRel = (DrillScanRel) call.rel(2);
         doOnMatch(call, filterRel, projectRel, scanRel);
       };
-    };
-  }
-
-  public static final RelOptRule getFilterOnScan(QueryContext context){
-    return new PruneScanRule(
-          RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
-          "PruneScanRule:Filter_On_Scan", context) {
-
-      @Override
-        public boolean matches(RelOptRuleCall call) {
-          final DrillScanRel scan = (DrillScanRel) call.rel(1);
-          GroupScan groupScan = scan.getGroupScan();
-          // this rule is applicable only for dfs based partition pruning
-          return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
-        }
 
       @Override
-      public void onMatch(RelOptRuleCall call) {
-        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
-        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
-        doOnMatch(call, filterRel, null, scanRel);
+      protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+        return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns());
       }
-    };
-  }
-
-  final QueryContext context;
-
-  private PruneScanRule(RelOptRuleOperand operand, String id, QueryContext context) {
-    super(operand, id);
-    this.context = context;
-  }
 
-  protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
-    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    FileSystemPartitionDescriptor descriptor = new FileSystemPartitionDescriptor(settings.getFsPartitionColumnLabel());
-    final BufferAllocator allocator = context.getAllocator();
-
-
-    RexNode condition = null;
-    if(projectRel == null){
-      condition = filterRel.getCondition();
-    }else{
-      // get the filter as if it were below the projection.
-      condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
-    }
-
-    Map<Integer, String> dirNames = Maps.newHashMap();
-    List<String> fieldNames = scanRel.getRowType().getFieldNames();
-    BitSet columnBitset = new BitSet();
-    BitSet dirBitset = new BitSet();
-    {
-      int colIndex = 0;
-      for(String field : fieldNames){
-        final Integer dirIndex = descriptor.getIdIfValid(field);
-        if(dirIndex != null){
-          dirNames.put(dirIndex, field);
-          dirBitset.set(dirIndex);
-          columnBitset.set(colIndex);
+      @Override
+      protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+        int record = 0;
+        for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+          final PathPartition partition = iter.next();
+          for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+            SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+            ((ParquetGroupScan)groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file);
+          }
         }
-        colIndex++;
-      }
-    }
-
-    if(dirBitset.isEmpty()){
-      return;
-    }
-
-    FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
-    c.analyze(condition);
-    RexNode pruneCondition = c.getFinalCondition();
-
-    if(pruneCondition == null){
-      return;
-    }
-
-    // set up the partitions
-    final FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection();
-    final List<String> files = origSelection.getAsFiles();
-    final String selectionRoot = origSelection.getSelection().selectionRoot;
-    List<PathPartition> partitions = Lists.newLinkedList();
-
-    // let's only deal with one batch of files for now.
-    if(files.size() > Character.MAX_VALUE){
-      return;
-    }
-
-    for(String f : files){
-      partitions.add(new PathPartition(descriptor.getMaxHierarchyLevel(), selectionRoot, f));
-    }
-
-    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
-    final VectorContainer container = new VectorContainer();
-
-    try{
-      final NullableVarCharVector[] vectors = new NullableVarCharVector[descriptor.getMaxHierarchyLevel()];
-      for(int dirIndex : BitSets.toIter(dirBitset)){
-        NullableVarCharVector vector = new NullableVarCharVector(MaterializedField.create(dirNames.get(dirIndex), Types.optional(MinorType.VARCHAR)), allocator);
-        vector.allocateNew(5000, partitions.size());
-        vectors[dirIndex] = vector;
-        container.add(vector);
-      }
 
-      // populate partition vectors.
-      int record = 0;
-      for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
-        final PathPartition partition = iter.next();
-        for(int dirIndex : BitSets.toIter(dirBitset)){
-          if(partition.dirs[dirIndex] == null){
-            vectors[dirIndex].getMutator().setNull(record);
-          }else{
-            byte[] bytes = partition.dirs[dirIndex].getBytes(Charsets.UTF_8);
-            vectors[dirIndex].getMutator().setSafe(record, bytes, 0, bytes.length);
+        for(ValueVector v : vectors){
+          if(v == null){
+            continue;
           }
+          v.getMutator().setValueCount(partitions.size());
         }
       }
 
-      for(NullableVarCharVector v : vectors){
-        if(v == null){
-          continue;
-        }
-        v.getMutator().setValueCount(partitions.size());
+      @Override
+      protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+        return ((ParquetGroupScan)groupScan).getTypeForColumn(column);
       }
 
-
-      // materialize the expression
-      logger.debug("Attempting to prune {}", pruneCondition);
-      LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
-      ErrorCollectorImpl errors = new ErrorCollectorImpl();
-      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, context.getFunctionRegistry());
-      if (errors.getErrorCount() != 0) {
-        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
+      @Override
+      protected List<String> getFiles(DrillScanRel scanRel) {
+        ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan();
+        return new ArrayList(groupScan.getFileSet());
       }
+    };
+  }
 
-      output.allocateNew(partitions.size());
-      InterpreterEvaluator.evaluate(partitions.size(), context, container, output, materializedExpr);
-      record = 0;
-
-      List<String> newFiles = Lists.newArrayList();
-      for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
-        PathPartition part = iter.next();
-        if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
-          newFiles.add(part.file);
-        }
-      }
+  // Using separate rules for Parquet column based partition pruning. In the future, we may want to see if we can combine these into
+  // a single rule which handles both types of pruning
 
-      boolean canDropFilter = true;
+  public static final RelOptRule getFilterOnScanParquet(QueryContext context){
+    return new PruneScanRule(
+        RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
+        "PruneScanRule:Filter_On_Scan_Parquet", context) {
 
-      if(newFiles.isEmpty()){
-        newFiles.add(files.get(0));
-        canDropFilter = false;
+      @Override
+      public boolean matches(RelOptRuleCall call) {
+        final DrillScanRel scan = (DrillScanRel) call.rel(1);
+        GroupScan groupScan = scan.getGroupScan();
+        // this rule is applicable only for dfs based partition pruning
+        return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown();
       }
 
-      if(newFiles.size() == files.size()){
-        return;
+      @Override
+      public void onMatch(RelOptRuleCall call) {
+        final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0);
+        final DrillScanRel scanRel = (DrillScanRel) call.rel(1);
+        doOnMatch(call, filterRel, null, scanRel);
       }
 
-      logger.debug("Pruned {} => {}", files, newFiles);
-
-      List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
-      List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
-      conjuncts.removeAll(pruneConjuncts);
-      RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
-
-      final FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true);
-      final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
-      final DrillScanRel newScanRel =
-          new DrillScanRel(scanRel.getCluster(),
-              scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scanRel.getTable(),
-              newScan,
-              scanRel.getRowType(),
-              scanRel.getColumns());
+      @Override
+      protected PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+        return new ParquetPartitionDescriptor(scanRel.getGroupScan().getPartitionColumns());
+      }
 
-      RelNode inputRel = newScanRel;
+      @Override
+      protected void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan) {
+        int record = 0;
+        for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+          final PathPartition partition = iter.next();
+          for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+            SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+            ((ParquetGroupScan)groupScan).populatePruningVector(vectors[partitionColumnIndex], record, column, partition.file);
+          }
+        }
 
-      if(projectRel != null){
-        inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+        for(ValueVector v : vectors){
+          if(v == null){
+            continue;
+          }
+          v.getMutator().setValueCount(partitions.size());
+        }
       }
 
-      if (newCondition.isAlwaysTrue() && canDropFilter) {
-        call.transformTo(inputRel);
-      } else {
-        final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
-        call.transformTo(newFilter);
+      @Override
+      protected MajorType getVectorType(GroupScan groupScan, SchemaPath column) {
+        return ((ParquetGroupScan)groupScan).getTypeForColumn(column);
       }
 
-    }catch(Exception e){
-      logger.warn("Exception while trying to prune partition.", e);
-    }finally{
-      container.clear();
-      if(output !=null){
-        output.clear();
+      @Override
+      protected List<String> getFiles(DrillScanRel scanRel) {
+        ParquetGroupScan groupScan = (ParquetGroupScan) scanRel.getGroupScan();
+        return new ArrayList(groupScan.getFileSet());
       }
-    }
+    };
   }
 
-  private static class PathPartition {
-    final String[] dirs;
-    final String file;
-
-    public PathPartition(int max, String selectionRoot, String file){
-      this.file = file;
-      int start = file.indexOf(selectionRoot) + selectionRoot.length();
-      String postPath = file.substring(start);
-      if(postPath.charAt(0) == '/'){
-        postPath = postPath.substring(1);
-      }
-      String[] mostDirs = postPath.split("/");
-      this.dirs = new String[max];
-      int maxLoop = Math.min(max, mostDirs.length - 1);
-      for(int i =0; i < maxLoop; i++){
-        this.dirs[i] = mostDirs[i];
-      }
-    }
+   final QueryContext context;
+
+   private PruneScanRule(RelOptRuleOperand operand, String id, QueryContext context) {
+     super(operand, id);
+     this.context = context;
+   }
+
+   protected abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel);
+
+   protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) {
+     final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+     PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
+     final BufferAllocator allocator = context.getAllocator();
+
+
+     RexNode condition = null;
+     if(projectRel == null){
+       condition = filterRel.getCondition();
+     }else{
+       // get the filter as if it were below the projection.
+       condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel);
+     }
+
+
+     Map<Integer, String> fieldNameMap = Maps.newHashMap();
+     List<String> fieldNames = scanRel.getRowType().getFieldNames();
+     BitSet columnBitset = new BitSet();
+     BitSet partitionColumnBitSet = new BitSet();
+
+     {
+       int relColIndex = 0;
+       for(String field : fieldNames){
+         final Integer partitionIndex = descriptor.getIdIfValid(field);
+         if(partitionIndex != null){
+           fieldNameMap.put(partitionIndex, field);
+           partitionColumnBitSet.set(partitionIndex);
+           columnBitset.set(relColIndex);
+         }
+         relColIndex++;
+       }
+     }
+
+     if(partitionColumnBitSet.isEmpty()){
+       return;
+     }
+
+     FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
+     c.analyze(condition);
+     RexNode pruneCondition = c.getFinalCondition();
+
+     if(pruneCondition == null){
+       return;
+     }
+
+     // set up the partitions
+     final GroupScan groupScan = scanRel.getGroupScan();
+     final FormatSelection origSelection = (FormatSelection)scanRel.getDrillTable().getSelection();
+     final List<String> files = getFiles(scanRel);
+     final String selectionRoot = origSelection.getSelection().selectionRoot;
+     List<PathPartition> partitions = Lists.newLinkedList();
+
+     // let's only deal with one batch of files for now.
+     if(files.size() > Character.MAX_VALUE){
+       return;
+     }
+
+     for(String f : files){
+       partitions.add(new PathPartition(descriptor.getMaxHierarchyLevel(), selectionRoot, f));
+     }
+
+     final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
+     final VectorContainer container = new VectorContainer();
+
+     try{
+       final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+       for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){
+         SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+         MajorType type = getVectorType(groupScan, column);
+         MaterializedField field = MaterializedField.create(column, type);
+         ValueVector v = TypeHelper.getNewVector(field, allocator);
+         v.allocateNew();
+         vectors[partitionColumnIndex] = v;
+         container.add(v);
+       }
+
+       // populate partition vectors.
+
+       populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap, groupScan);
+
+       // materialize the expression
+       logger.debug("Attempting to prune {}", pruneCondition);
+       LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
+       ErrorCollectorImpl errors = new ErrorCollectorImpl();
+       LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, context.getFunctionRegistry());
+       if (errors.getErrorCount() != 0) {
+         logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
+       }
+
+       output.allocateNew(partitions.size());
+       InterpreterEvaluator.evaluate(partitions.size(), context, container, output, materializedExpr);
+       int record = 0;
+
+       List<String> newFiles = Lists.newArrayList();
+       for(Iterator<PathPartition> iter = partitions.iterator(); iter.hasNext(); record++){
+         PathPartition part = iter.next();
+         if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
+           newFiles.add(part.file);
+         }
+       }
+
+       boolean canDropFilter = true;
+
+       if(newFiles.isEmpty()){
+         newFiles.add(files.get(0));
+         canDropFilter = false;
+       }
+
+       if(newFiles.size() == files.size()){
+         return;
+       }
+
+       logger.debug("Pruned {} => {}", files, newFiles);
+
+       List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
+       List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
+       conjuncts.removeAll(pruneConjuncts);
+       RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);
+
+       final FileSelection newFileSelection = new FileSelection(newFiles, selectionRoot, true);
+       final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
+       final DrillScanRel newScanRel =
+           new DrillScanRel(scanRel.getCluster(),
+               scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+               scanRel.getTable(),
+               newScan,
+               scanRel.getRowType(),
+               scanRel.getColumns());
+
+       RelNode inputRel = newScanRel;
+
+       if(projectRel != null){
+         inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
+       }
+
+       if (newCondition.isAlwaysTrue() && canDropFilter) {
+         call.transformTo(inputRel);
+       } else {
+         final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
+         call.transformTo(newFilter);
+       }
+
+     }catch(Exception e){
+       logger.warn("Exception while trying to prune partition.", e);
+     }finally{
+       container.clear();
+       if(output !=null){
+         output.clear();
+       }
+     }
+   }
+
+   protected abstract void populatePartitionVectors(ValueVector[] vectors, List<PathPartition> partitions, BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap, GroupScan groupScan);
+
+   protected abstract MajorType getVectorType(GroupScan groupScan, SchemaPath column);
+
+   protected abstract List<String> getFiles(DrillScanRel scanRel);
+
+   private static class PathPartition {
+        final String[] dirs;
+        final String file;
+
+        public PathPartition(int max, String selectionRoot, String file){
+          this.file = file;
+          int start = file.indexOf(selectionRoot) + selectionRoot.length();
+          String postPath = file.substring(start);
+          if(postPath.charAt(0) == '/'){
+            postPath = postPath.substring(1);
+          }
+          String[] mostDirs = postPath.split("/");
+          this.dirs = new String[max];
+          int maxLoop = Math.min(max, mostDirs.length - 1);
+          for(int i =0; i < maxLoop; i++){
+            this.dirs[i] = mostDirs[i];
+          }
+        }
 
 
-  }
+      }
 
-}
+ }

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 1e63748..6dda1a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -39,6 +40,7 @@ import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -159,7 +161,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
     @Override
     public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
 
-      final Prel child = ((Prel)prel.getInput()).accept(this, null);
+      final Prel child = ((Prel) prel.getInput()).accept(this, null);
 
       final RelDataType childRowType = child.getRowType();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
new file mode 100644
index 0000000..fedb473
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import javax.inject.Inject;
+
+/**
+ *  The functions are similar to those created through FreeMarker template for fixed types. There is not much benefit to
+ *  using code generation for generating the functions for variable length types, so simply doing them by hand.
+ */
+public class NewValueFunction {
+
+  @FunctionTemplate(name = "newPartitionValue",
+      scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls = NullHandling.INTERNAL)
+  public static class NewValueVarChar implements DrillSimpleFunc {
+
+    @Param VarCharHolder in;
+    @Workspace VarCharHolder previous;
+    @Workspace Boolean initialized;
+    @Output BitHolder out;
+    @Inject DrillBuf buf;
+
+    public void setup() {
+      initialized = false;
+      previous.buffer = buf;
+      previous.start = 0;
+    }
+
+    public void eval() {
+      int length = in.end - in.start;
+
+      if (initialized) {
+        if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+          out.value = 0;
+        } else {
+          previous.buffer = buf.reallocIfNeeded(length);
+          previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+          previous.end = in.end - in.start;
+          out.value = 1;
+        }
+      } else {
+        previous.buffer = buf.reallocIfNeeded(length);
+        previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+        previous.end = in.end - in.start;
+        out.value = 1;
+        initialized = true;
+      }
+    }
+  }
+
+  @FunctionTemplate(name = "newPartitionValue",
+      scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls = NullHandling.INTERNAL)
+  public static class NewValueVarCharNullable implements DrillSimpleFunc {
+
+    @Param NullableVarCharHolder in;
+    @Workspace NullableVarCharHolder previous;
+    @Workspace Boolean initialized;
+    @Output BitHolder out;
+    @Inject DrillBuf buf;
+
+    public void setup() {
+      initialized = false;
+      previous.buffer = buf;
+      previous.start = 0;
+    }
+
+    public void eval() {
+      int length = in.isSet == 0 ? 0 : in.end - in.start;
+
+      if (initialized) {
+        if (previous.isSet == 0 && in.isSet == 0 ||
+            (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+                previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
+          out.value = 0;
+        } else {
+          if (in.isSet == 1) {
+            previous.buffer = buf.reallocIfNeeded(length);
+            previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+            previous.end = in.end - in.start;
+          }
+          previous.isSet = in.isSet;
+          out.value = 1;
+        }
+      } else {
+        previous.buffer = buf.reallocIfNeeded(length);
+        previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+        previous.end = in.end - in.start;
+        previous.isSet = 1;
+        out.value = 1;
+        initialized = true;
+      }
+    }
+  }
+
+  @FunctionTemplate(name = "newPartitionValue",
+      scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls = NullHandling.INTERNAL)
+  public static class NewValueVarBinary implements DrillSimpleFunc {
+
+    @Param VarBinaryHolder in;
+    @Workspace VarBinaryHolder previous;
+    @Workspace Boolean initialized;
+    @Output BitHolder out;
+    @Inject DrillBuf buf;
+
+    public void setup() {
+      initialized = false;
+      previous.buffer = buf;
+      previous.start = 0;
+    }
+
+    public void eval() {
+      int length = in.end - in.start;
+
+      if (initialized) {
+        if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+          out.value = 0;
+        } else {
+          previous.buffer = buf.reallocIfNeeded(length);
+          previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+          previous.end = in.end - in.start;
+          out.value = 1;
+        }
+      } else {
+        previous.buffer = buf.reallocIfNeeded(length);
+        previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+        previous.end = in.end - in.start;
+        out.value = 1;
+        initialized = true;
+      }
+    }
+  }
+
+  @FunctionTemplate(name = "newPartitionValue",
+      scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls = NullHandling.INTERNAL)
+  public static class NewValueVarBinaryNullable implements DrillSimpleFunc {
+
+    @Param NullableVarBinaryHolder in;
+    @Workspace NullableVarBinaryHolder previous;
+    @Workspace Boolean initialized;
+    @Output BitHolder out;
+    @Inject DrillBuf buf;
+
+    public void setup() {
+      initialized = false;
+      previous.buffer = buf;
+      previous.start = 0;
+    }
+
+    public void eval() {
+      int length = in.isSet == 0 ? 0 : in.end - in.start;
+
+      if (initialized) {
+        if (previous.isSet == 0 && in.isSet == 0 ||
+            (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+                previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
+          out.value = 0;
+        } else {
+          if (in.isSet == 1) {
+            previous.buffer = buf.reallocIfNeeded(length);
+            previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+            previous.end = in.end - in.start;
+          }
+          previous.isSet = in.isSet;
+          out.value = 1;
+        }
+      } else {
+        previous.buffer = buf.reallocIfNeeded(length);
+        previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
+        previous.end = in.end - in.start;
+        previous.isSet = 1;
+        out.value = 1;
+        initialized = true;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index a43a4a0..ea45653 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.JSONOutputRecordWriter;
@@ -90,7 +91,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   }
 
   @Override
-  public void updateSchema(BatchSchema schema) throws IOException {
+  public void updateSchema(VectorAccessible batch) throws IOException {
     // no op
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index cf39518..ec28833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -19,18 +19,28 @@ package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
@@ -41,6 +51,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -54,14 +65,44 @@ import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableTinyIntVector;
+import org.apache.drill.exec.vector.NullableUInt1Vector;
+import org.apache.drill.exec.vector.NullableUInt2Vector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.joda.time.DateTimeUtils;
+import parquet.column.statistics.Statistics;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileWriter;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
 import com.codahale.metrics.MetricRegistry;
@@ -74,6 +115,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
 
 @JsonTypeName("parquet-scan")
 public class ParquetGroupScan extends AbstractFileGroupScan {
@@ -169,6 +213,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.rowGroupInfos = that.rowGroupInfos == null ? null : Lists.newArrayList(that.rowGroupInfos);
     this.selectionRoot = that.selectionRoot;
     this.columnValueCounts = that.columnValueCounts;
+    this.columnTypeMap = that.columnTypeMap;
+    this.partitionValueMap = that.partitionValueMap;
+    this.fileSet = that.fileSet;
   }
 
 
@@ -214,6 +261,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
+  public Set<String> getFileSet() {
+    return fileSet;
+  }
+
+  private Set<String> fileSet = Sets.newHashSet();
+
   private void readFooterHelper(List<FileStatus> statuses) throws IOException {
     watch.reset();
     watch.start();
@@ -227,10 +280,19 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     ColumnChunkMetaData columnChunkMetaData;
 
     List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
+    boolean first = true;
+    ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
     for (Footer footer : footers) {
       int index = 0;
       ParquetMetadata metadata = footer.getParquetMetadata();
+      FileMetaData fileMetaData = metadataConverter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata);
+      HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+      for (SchemaElement se : fileMetaData.getSchema()) {
+        schemaElements.put(se.getName(), se);
+      }
       for (BlockMetaData rowGroup : metadata.getBlocks()) {
+        String file = Path.getPathWithoutSchemeAndAuthority(footer.getFile()).toString();
+        fileSet.add(file);
         long valueCountInGrp = 0;
         // need to grab block information from HDFS
         columnChunkMetaData = rowGroup.getColumns().iterator().next();
@@ -242,28 +304,49 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         for (ColumnChunkMetaData col : rowGroup.getColumns()) {
           length += col.getTotalSize();
           valueCountInGrp = Math.max(col.getValueCount(), valueCountInGrp);
-          SchemaPath path = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
+          SchemaPath schemaPath = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
 
           long previousCount = 0;
           long currentCount = 0;
 
-          if (! columnValueCounts.containsKey(path)) {
+          if (! columnValueCounts.containsKey(schemaPath)) {
             // create an entry for this column
-            columnValueCounts.put(path, previousCount /* initialize to 0 */);
+            columnValueCounts.put(schemaPath, previousCount /* initialize to 0 */);
           } else {
-            previousCount = columnValueCounts.get(path);
+            previousCount = columnValueCounts.get(schemaPath);
           }
 
           boolean statsAvail = (col.getStatistics() != null && !col.getStatistics().isEmpty());
 
           if (statsAvail && previousCount != GroupScan.NO_COLUMN_STATS) {
             currentCount = col.getValueCount() - col.getStatistics().getNumNulls(); // only count non-nulls
-            columnValueCounts.put(path, previousCount + currentCount);
+            columnValueCounts.put(schemaPath, previousCount + currentCount);
           } else {
             // even if 1 chunk does not have stats, we cannot rely on the value count for this column
-            columnValueCounts.put(path, GroupScan.NO_COLUMN_STATS);
+            columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
           }
 
+          // check if this column can be used for partition pruning
+          SchemaElement se = schemaElements.get(schemaPath.getAsUnescapedPath());
+          boolean partitionColumn = checkForPartitionColumn(schemaPath, col, se, first);
+          if (partitionColumn) {
+            Map<SchemaPath,Object> map = partitionValueMap.get(file);
+            if (map == null) {
+              map = Maps.newHashMap();
+              partitionValueMap.put(file, map);
+            }
+            Object value = map.get(schemaPath);
+            Object currentValue = col.getStatistics().genericGetMax();
+            if (value != null) {
+              if (value != currentValue) {
+                columnTypeMap.remove(schemaPath);
+              }
+            } else {
+              map.put(schemaPath, currentValue);
+            }
+          } else {
+            columnTypeMap.remove(schemaPath);
+          }
         }
 
         String filePath = footer.getFile().toUri().getPath();
@@ -272,6 +355,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         index++;
 
         rowCount += rowGroup.getRowCount();
+        first = false;
       }
 
     }
@@ -281,6 +365,109 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
+  @JsonIgnore
+  private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap();
+
+  /**
+      * When reading the very first footer, any column is a potential partition column. So for the first footer, we check
+      * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
+      * remaining footers, we will not find any new partition columns, but we may discover that what was previously a
+      * potential partition column now no longer qualifies, so it needs to be removed from the list.
+      * @param column
+      * @param columnChunkMetaData
+      * @param se
+      * @param first
+      * @return whether column is a potential partition column
+      */
+  private boolean checkForPartitionColumn(SchemaPath column, ColumnChunkMetaData columnChunkMetaData, SchemaElement se, boolean first) {
+    if (first) {
+      if (hasSingleValue(columnChunkMetaData)) {
+        columnTypeMap.put(column, getType(columnChunkMetaData, se));
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      if (!columnTypeMap.keySet().contains(column)) {
+        return false;
+      } else {
+        if (!hasSingleValue(columnChunkMetaData)) {
+          columnTypeMap.remove(column);
+          return false;
+        }
+        if (!getType(columnChunkMetaData, se).equals(columnTypeMap.get(column))) {
+          columnTypeMap.remove(column);
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private MajorType getType(ColumnChunkMetaData columnChunkMetaData, SchemaElement schemaElement) {
+    ConvertedType originalType = schemaElement == null ? null : schemaElement.getConverted_type();
+
+    if (originalType != null) {
+      switch (originalType) {
+      case DECIMAL:
+        return Types.optional(MinorType.DECIMAL18);
+      case DATE:
+        return Types.optional(MinorType.DATE);
+      case TIME_MILLIS:
+        return Types.optional(MinorType.TIME);
+      case TIMESTAMP_MILLIS:
+        return Types.optional(MinorType.TIMESTAMP);
+      case UTF8:
+        return Types.optional(MinorType.VARCHAR);
+      case UINT_8:
+        return Types.optional(MinorType.UINT1);
+      case UINT_16:
+        return Types.optional(MinorType.UINT2);
+      case UINT_32:
+        return Types.optional(MinorType.UINT4);
+      case UINT_64:
+        return Types.optional(MinorType.UINT8);
+      case INT_8:
+        return Types.optional(MinorType.TINYINT);
+      case INT_16:
+        return Types.optional(MinorType.SMALLINT);
+      }
+    }
+
+    PrimitiveTypeName type = columnChunkMetaData.getType();
+    switch (type) {
+    case BOOLEAN:
+      return Types.optional(MinorType.BIT);
+    case INT32:
+      return Types.optional(MinorType.INT);
+    case INT64:
+      return Types.optional(MinorType.BIGINT);
+    case FLOAT:
+      return Types.optional(MinorType.FLOAT4);
+    case DOUBLE:
+      return Types.optional(MinorType.FLOAT8);
+    case BINARY:
+    case FIXED_LEN_BYTE_ARRAY:
+      return Types.optional(MinorType.VARBINARY);
+    default:
+      // Should never hit this
+      throw new UnsupportedOperationException("Unsupported type:" + type);
+    }
+  }
+
+  private boolean hasSingleValue(ColumnChunkMetaData columnChunkMetaData) {
+    Statistics stats = columnChunkMetaData.getStatistics();
+    boolean hasStats = stats != null && !stats.isEmpty();
+    if (hasStats) {
+      if (stats.genericGetMin() == null || stats.genericGetMax() == null) {
+        return false;
+      }
+      return stats.genericGetMax().equals(stats.genericGetMin());
+    } else {
+      return false;
+    }
+  }
+
   @Override
   public void modifyFileSelection(FileSelection selection) {
     entries.clear();
@@ -289,6 +476,113 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
+  public MajorType getTypeForColumn(SchemaPath schemaPath) {
+    return columnTypeMap.get(schemaPath);
+  }
+
+  private Map<String,Map<SchemaPath,Object>> partitionValueMap = Maps.newHashMap();
+
+  public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
+    String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
+    MinorType type = getTypeForColumn(column).getMinorType();
+    switch (type) {
+    case INT: {
+      NullableIntVector intVector = (NullableIntVector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      intVector.getMutator().setSafe(index, value);
+      return;
+    }
+    case SMALLINT: {
+      NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      smallIntVector.getMutator().setSafe(index, value.shortValue());
+      return;
+    }
+    case TINYINT: {
+      NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      tinyIntVector.getMutator().setSafe(index, value.byteValue());
+      return;
+    }
+    case UINT1: {
+      NullableUInt1Vector intVector = (NullableUInt1Vector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      intVector.getMutator().setSafe(index, value.byteValue());
+      return;
+    }
+    case UINT2: {
+      NullableUInt2Vector intVector = (NullableUInt2Vector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      intVector.getMutator().setSafe(index, (char) value.shortValue());
+      return;
+    }
+    case UINT4: {
+      NullableUInt4Vector intVector = (NullableUInt4Vector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      intVector.getMutator().setSafe(index, value);
+      return;
+    }
+    case BIGINT: {
+      NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
+      Long value = (Long) partitionValueMap.get(f).get(column);
+      bigIntVector.getMutator().setSafe(index, value);
+      return;
+    }
+    case FLOAT4: {
+      NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
+      Float value = (Float) partitionValueMap.get(f).get(column);
+      float4Vector.getMutator().setSafe(index, value);
+      return;
+    }
+    case FLOAT8: {
+      NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
+      Double value = (Double) partitionValueMap.get(f).get(column);
+      float8Vector.getMutator().setSafe(index, value);
+      return;
+    }
+    case VARBINARY: {
+      NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
+      Binary value = (Binary) partitionValueMap.get(f).get(column);
+      byte[] bytes = value.getBytes();
+      varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+      return;
+    }
+    case DECIMAL18: {
+      NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
+      Long value = (Long) partitionValueMap.get(f).get(column);
+      decimalVector.getMutator().setSafe(index, value);
+      return;
+    }
+    case DATE: {
+      NullableDateVector dateVector = (NullableDateVector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+      return;
+    }
+    case TIME: {
+      NullableTimeVector timeVector = (NullableTimeVector) v;
+      Integer value = (Integer) partitionValueMap.get(f).get(column);
+      timeVector.getMutator().set(index, value);
+      return;
+    }
+    case TIMESTAMP: {
+      NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
+      Long value = (Long) partitionValueMap.get(f).get(column);
+      timeStampVector.getMutator().set(index, value);
+      return;
+    }
+    case VARCHAR: {
+      NullableVarCharVector varCharVector = (NullableVarCharVector) v;
+      Binary value = (Binary) partitionValueMap.get(f).get(column);
+      byte[] bytes = value.getBytes();
+      varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
+      return;
+    }
+    default:
+      throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+  }
+
   public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
 
     private EndpointByteMap byteMap;
@@ -476,4 +770,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0;
   }
 
+  @Override
+  public List<SchemaPath> getPartitionColumns() {
+    return new ArrayList(columnTypeMap.keySet());
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 621f05c..12b15a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,17 +25,25 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -94,11 +102,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private String prefix;
   private int index = 0;
   private OperatorContext oContext;
+  private List<String> partitionColumns;
+  private boolean hasPartitions;
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
     this.oContext = context.newOperatorContext(writer, true);
     this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator());
+    this.partitionColumns = writer.getPartitionColumns();
+    this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
   }
 
   @Override
@@ -132,19 +144,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   }
 
   @Override
-  public void updateSchema(BatchSchema batchSchema) throws IOException {
-    if (this.batchSchema == null || !this.batchSchema.equals(batchSchema)) {
+  public void updateSchema(VectorAccessible batch) throws IOException {
+    if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema())) {
       if (this.batchSchema != null) {
         flush();
       }
-      this.batchSchema = batchSchema;
+      this.batchSchema = batch.getSchema();
       newSchema();
     }
+    TypedFieldId fieldId = batch.getValueVectorId(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD));
+    if (fieldId != null) {
+      VectorWrapper w = batch.getValueAccessorById(BitVector.class, fieldId.getFieldIds());
+      setPartitionVector((BitVector) w.getValueVector());
+    }
   }
 
   private void newSchema() throws IOException {
     List<Type> types = Lists.newArrayList();
     for (MaterializedField field : batchSchema) {
+      if (field.getPath().equals(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD))) {
+        continue;
+      }
       types.add(getType(field));
     }
     schema = new MessageType("root", types);
@@ -189,6 +209,22 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
   }
 
+  @Override
+  public void checkForNewPartition(int index) {
+    if (!hasPartitions) {
+      return;
+    }
+    try {
+      boolean newPartition = newPartition(index);
+      if (newPartition) {
+        flush();
+        newSchema();
+      }
+    } catch (Exception e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
   private void flush() throws IOException {
     if (recordCount > 0) {
       parquetFileWriter.startBlock(recordCount);

http://git-wip-us.apache.org/repos/asf/drill/blob/5a34d819/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 31b1fbe..8a74b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;


Mime
View raw message