drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject drill git commit: DRILL-3735: For partition pruning divide up the partition lists into sublists of 64K each and iterate over each sublist.
Date Mon, 21 Sep 2015 22:39:35 GMT
Repository: drill
Updated Branches:
  refs/heads/master 3c89b30d4 -> 9f54aac33


DRILL-3735: For partition pruning divide up the partition lists into sublists of 64K each
and iterate over each sublist.

Add abstract base class for various partition descriptors.  Add logging messages in PruneScanRule
for better debuggability.

Address review comments.

Close apache/drill#156


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9f54aac3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9f54aac3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9f54aac3

Branch: refs/heads/master
Commit: 9f54aac33df3e783c0192ab56c7e1313dbc823fa
Parents: 3c89b30
Author: Aman Sinha <asinha@maprtech.com>
Authored: Sat Sep 12 12:57:12 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Mon Sep 21 14:25:45 2015 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |  27 +--
 .../planner/AbstractPartitionDescriptor.java    |  58 ++++++
 .../planner/FileSystemPartitionDescriptor.java  |  26 +--
 .../planner/ParquetPartitionDescriptor.java     |  26 +--
 .../drill/exec/planner/PartitionDescriptor.java |  11 +-
 .../logical/partition/PruneScanRule.java        | 179 +++++++++++--------
 6 files changed, 215 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 0328af0..11c6455 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.planner.sql;
 
 import io.netty.buffer.DrillBuf;
+
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -43,8 +45,10 @@ import java.util.Map;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import com.google.common.collect.Lists;
+
 // Partition descriptor for hive tables
-public class HivePartitionDescriptor implements PartitionDescriptor {
+public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
 
   private final Map<String, Integer> partitionMap = new HashMap<>();
   private final int numPartitionLevels;
@@ -107,16 +111,6 @@ public class HivePartitionDescriptor implements PartitionDescriptor {
   }
 
   @Override
-  public List<PartitionLocation> getPartitions() {
-    List<PartitionLocation> partitions = new LinkedList<>();
-    HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
-    for (Partition partition: origEntry.getPartitions()) {
-      partitions.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
-    }
-    return partitions;
-  }
-
-  @Override
   public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation>
partitions,
                                        BitSet partitionColumnBitSet, Map<Integer, String>
fieldNameMap) {
     int record = 0;
@@ -164,4 +158,15 @@ public class HivePartitionDescriptor implements PartitionDescriptor {
     return partitionMap.get(name);
   }
 
+  @Override
+  protected void createPartitionSublists() {
+    List<PartitionLocation> locations = new LinkedList<>();
+    HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
+    for (Partition partition: origEntry.getPartitions()) {
+      locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
+    }
+    locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
+    sublistsCreated = true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
new file mode 100644
index 0000000..c9ca448
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for file system based partition descriptors and Hive partition descriptors.
+ *
+ */
+public abstract class AbstractPartitionDescriptor implements PartitionDescriptor, Iterable<List<PartitionLocation>>
{
+
+  /**
+   * A sequence of sublists of partition locations combined into a single super list.
+   * The size of each sublist is at most {@link PartitionDescriptor.PARTITION_BATCH_SIZE}
+   * For example if the size is 3, the complete list could be: {(a, b, c), {d, e, f), (g,
h)}
+   */
+  protected List<List<PartitionLocation>> locationSuperList;
+  /**
+   * Flag to indicate if the sublists of the partition locations has been created
+   */
+  protected boolean sublistsCreated = false;
+
+  /**
+   * Create sublists of the partition locations, each sublist of size
+   * at most {@link PartitionDescriptor.PARTITION_BATCH_SIZE}
+   */
+  protected abstract void createPartitionSublists() ;
+
+  /**
+   * Iterator that traverses over the super list of partition locations and
+   * each time returns a single sublist of partition locations.
+   */
+  @Override
+  public Iterator<List<PartitionLocation>> iterator() {
+    if (!sublistsCreated) {
+      createPartitionSublists();
+    }
+    return locationSuperList.iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 9816f14..c10d0af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.calcite.util.BitSets;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -40,7 +42,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 
 // partition descriptor for file system based tables
-public class FileSystemPartitionDescriptor implements PartitionDescriptor {
+public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
 
   static final int MAX_NESTED_SUBDIRS = 10;          // allow up to 10 nested sub-directories
 
@@ -87,16 +89,6 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor
{
   }
 
   @Override
-  public List<PartitionLocation> getPartitions() {
-    List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
-    List<PartitionLocation> partitions = new LinkedList<>();
-    for (String file: fileLocations) {
-      partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),
file));
-    }
-    return partitions;
-  }
-
-  @Override
   public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation>
partitions,
                                        BitSet partitionColumnBitSet, Map<Integer, String>
fieldNameMap) {
     int record = 0;
@@ -133,4 +125,16 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor
{
     final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
     return origSelection.getSelection().selectionRoot;
   }
+
+  @Override
+  protected void createPartitionSublists() {
+    List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
+    List<PartitionLocation> locations = new LinkedList<>();
+    for (String file: fileLocations) {
+      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),
file));
+    }
+    locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
+    sublistsCreated = true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index f8af300..880daa6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.BitSet;
 import java.util.LinkedList;
@@ -40,7 +42,7 @@ import java.util.Set;
 /**
  * PartitionDescriptor that describes partitions based on column names instead of directory
structure
  */
-public class ParquetPartitionDescriptor implements PartitionDescriptor {
+public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
 
   private final List<SchemaPath> partitionColumns;
   private final DrillScanRel scanRel;
@@ -85,16 +87,6 @@ public class ParquetPartitionDescriptor implements PartitionDescriptor
{
   }
 
   @Override
-  public List<PartitionLocation> getPartitions() {
-    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
-    List<PartitionLocation> partitions = new LinkedList<>();
-    for (String file: fileLocations) {
-      partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),
file));
-    }
-    return partitions;
-  }
-
-  @Override
   public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation>
partitions,
                                        BitSet partitionColumnBitSet, Map<Integer, String>
fieldNameMap) {
     int record = 0;
@@ -125,4 +117,16 @@ public class ParquetPartitionDescriptor implements PartitionDescriptor
{
     final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
     return origSelection.getSelection().selectionRoot;
   }
+
+  @Override
+  protected void createPartitionSublists() {
+    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
+    List<PartitionLocation> locations = new LinkedList<>();
+    for (String file: fileLocations) {
+      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(),
file));
+    }
+    locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
+    sublistsCreated = true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 6a4ee9e..726d8bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -17,21 +17,20 @@
  */
 package org.apache.drill.exec.planner;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.vector.ValueVector;
 
-import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
 // Interface used to describe partitions. Currently used by file system based partitions
and hive partitions
-public interface PartitionDescriptor {
+public interface PartitionDescriptor extends Iterable<List<PartitionLocation>>
{
+
+  public static final int PARTITION_BATCH_SIZE = Character.MAX_VALUE;
 
   /* Get the hierarchy index of the given partition
    * For eg: if we have the partition laid out as follows
@@ -56,12 +55,10 @@ public interface PartitionDescriptor {
 
   public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;
 
-  public List<PartitionLocation> getPartitions();
-
   /**
    * Method creates an in memory representation of all the partitions. For each level of
partitioning we
    * will create a value vector which this method will populate for all the partitions with
the values of the
-   * partioning key
+   * partitioning key
    * @param vectors - Array of vectors in the container that need to be populated
    * @param partitions - List of all the partitions that exist in the table
    * @param partitionColumnBitSet - Partition columns selected in the query

http://git-wip-us.apache.org/repos/asf/drill/blob/9f54aac3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 1c5c7e4..562cb71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -154,20 +154,19 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
     BitSet columnBitset = new BitSet();
     BitSet partitionColumnBitSet = new BitSet();
 
-    {
-      int relColIndex = 0;
-      for (String field : fieldNames) {
-        final Integer partitionIndex = descriptor.getIdIfValid(field);
-        if (partitionIndex != null) {
-          fieldNameMap.put(partitionIndex, field);
-          partitionColumnBitSet.set(partitionIndex);
-          columnBitset.set(relColIndex);
-        }
-        relColIndex++;
+    int relColIndex = 0;
+    for (String field : fieldNames) {
+      final Integer partitionIndex = descriptor.getIdIfValid(field);
+      if (partitionIndex != null) {
+        fieldNameMap.put(partitionIndex, field);
+        partitionColumnBitSet.set(partitionIndex);
+        columnBitset.set(relColIndex);
       }
+      relColIndex++;
     }
 
     if (partitionColumnBitSet.isEmpty()) {
+      logger.debug("No partition columns are projected from the scan..continue.");
       return;
     }
 
@@ -176,81 +175,94 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
     RexNode pruneCondition = c.getFinalCondition();
 
     if (pruneCondition == null) {
+      logger.debug("No conditions were found eligible for partition pruning.");
       return;
     }
 
-
     // set up the partitions
-    final GroupScan groupScan = scanRel.getGroupScan();
-    List<PartitionLocation> partitions = descriptor.getPartitions();
-
-    if (partitions.size() > Character.MAX_VALUE) {
-      return;
-    }
-
-    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)),
allocator);
-    final VectorContainer container = new VectorContainer();
-
-    try {
-      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
-      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
-        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
-        MajorType type = descriptor.getVectorType(column, settings);
-        MaterializedField field = MaterializedField.create(column, type);
-        ValueVector v = TypeHelper.getNewVector(field, allocator);
-        v.allocateNew();
-        vectors[partitionColumnIndex] = v;
-        container.add(v);
-      }
-
-      // populate partition vectors.
-      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
-
-      // materialize the expression
-      logger.debug("Attempting to prune {}", pruneCondition);
-      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings),
scanRel, pruneCondition);
-      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
-
-      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container,
errors, optimizerContext.getFunctionRegistry());
-      // Make sure pruneCondition's materialized expression is always of BitType, so that
-      // it's same as the type of output vector.
-      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
-        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
-            materializedExpr,
-            materializedExpr.getMajorType().getMinorType(),
-            optimizerContext.getFunctionRegistry(),
-            errors);
-      }
-
-      if (errors.getErrorCount() != 0) {
-        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
+    List<String> newFiles = Lists.newArrayList();
+    long numTotal = 0; // total number of partitions
+    int batchIndex = 0;
+    String firstLocation = null;
+    LogicalExpression materializedExpr = null;
+
+    // Outer loop: iterate over a list of batches of PartitionLocations
+    for (List<PartitionLocation> partitions : descriptor) {
+      numTotal += partitions.size();
+      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
+      if (batchIndex == 0) { // save the first location in case everything is pruned
+        firstLocation = partitions.get(0).getEntirePartitionLocation();
       }
+      final NullableBitVector output = new NullableBitVector(MaterializedField.create("",
Types.optional(MinorType.BIT)), allocator);
+      final VectorContainer container = new VectorContainer();
+
+      try {
+        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
+        for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
+          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
+          MajorType type = descriptor.getVectorType(column, settings);
+          MaterializedField field = MaterializedField.create(column, type);
+          ValueVector v = TypeHelper.getNewVector(field, allocator);
+          v.allocateNew();
+          vectors[partitionColumnIndex] = v;
+          container.add(v);
+        }
 
-      output.allocateNew(partitions.size());
-      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output,
materializedExpr);
-      int record = 0;
+        // populate partition vectors.
+        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
+
+        // materialize the expression; only need to do this once
+        if (batchIndex == 0) {
+          materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container);
+          if (materializedExpr == null) {
+            // continue without partition pruning; no need to log anything here since
+            // materializePruneExpr logs it already
+            return;
+          }
+        }
 
-      List<String> newFiles = Lists.newArrayList();
-      for(PartitionLocation part: partitions){
-        if(!output.getAccessor().isNull(record) && output.getAccessor().get(record)
== 1){
-          newFiles.add(part.getEntirePartitionLocation());
+        output.allocateNew(partitions.size());
+        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output,
materializedExpr);
+        int recordCount = 0;
+        int qualifiedCount = 0;
+
+        // Inner loop: within each batch iterate over the PartitionLocations
+        for(PartitionLocation part: partitions){
+          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount)
== 1){
+            newFiles.add(part.getEntirePartitionLocation());
+            qualifiedCount++;
+          }
+          recordCount++;
+        }
+        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex,
recordCount, qualifiedCount);
+        batchIndex++;
+      } catch (Exception e) {
+        logger.warn("Exception while trying to prune partition.", e);
+        return; // continue without partition pruning
+      } finally {
+        container.clear();
+        if (output != null) {
+          output.clear();
         }
-        record++;
       }
+    }
+
+    try {
 
       boolean canDropFilter = true;
 
       if (newFiles.isEmpty()) {
-        newFiles.add(partitions.get(0).getEntirePartitionLocation());
+        assert firstLocation != null;
+        newFiles.add(firstLocation);
         canDropFilter = false;
       }
 
-      if (newFiles.size() == partitions.size()) {
+      if (newFiles.size() == numTotal) {
+        logger.info("No partitions were eligible for pruning");
         return;
       }
 
-      logger.debug("Pruned {} => {}", partitions.size(), newFiles.size());
-
+      logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size());
 
       List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
       List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
@@ -284,13 +296,36 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule
{
       }
 
     } catch (Exception e) {
-      logger.warn("Exception while trying to prune partition.", e);
-    } finally {
-      container.clear();
-      if (output != null) {
-        output.clear();
-      }
+      logger.warn("Exception while using the pruned partitions.", e);
+    }
+  }
+
+  protected LogicalExpression materializePruneExpr(RexNode pruneCondition,
+      PlannerSettings settings,
+      RelNode scanRel,
+      VectorContainer container
+      ) {
+    // materialize the expression
+    logger.debug("Attempting to prune {}", pruneCondition);
+    final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel,
pruneCondition);
+    final ErrorCollectorImpl errors = new ErrorCollectorImpl();
+
+    LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container,
errors, optimizerContext.getFunctionRegistry());
+    // Make sure pruneCondition's materialized expression is always of BitType, so that
+    // it's same as the type of output vector.
+    if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
+      materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
+          materializedExpr,
+          materializedExpr.getMajorType().getMinorType(),
+          optimizerContext.getFunctionRegistry(),
+          errors);
+    }
+
+    if (errors.getErrorCount() != 0) {
+      logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
+      return null;
     }
+    return materializedExpr;
   }
 
   protected OptimizerRulesContext getOptimizerRulesContext() {


Mime
View raw message