drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject drill git commit: DRILL-4530: Optimize partition pruning with metadata caching for the single partition case.
Date Tue, 19 Jul 2016 00:18:14 GMT
Repository: drill
Updated Branches:
  refs/heads/master 70aba772a -> 4f818d074


DRILL-4530: Optimize partition pruning with metadata caching for the single partition case.

 - Enhance PruneScanRule to detect single partitions based on referenced dirs in the filter.
 - Keep a new status of EXPANDED_PARTIAL for FileSelection.
 - Create separate .directories metadata file to prune directories first before files.
 - Introduce cacheFileRoot attribute to keep track of the parent directory of the cache file after partition pruning.

Check if prefix components are non-null the very first time single partition info is initialized.

Add separate interface method to create scan using a cacheFileRoot.

Create filenames list with unique names using fileSet if available.  Add several unit tests.

Populate only fileSet when expanding using the metadata cache.

Remove cacheFileRoot parameter from FileGroupScan's clone() method and instead leverage it from FileSelection.

Keep track of whether all partitions were previously pruned and process this state where needed.

close apache/drill#519


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f818d07
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f818d07
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f818d07

Branch: refs/heads/master
Commit: 4f818d074373f3572cb3c2e99d1c9c43df2090aa
Parents: 70aba77
Author: Aman Sinha <asinha@maprtech.com>
Authored: Fri Mar 25 12:55:59 2016 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Mon Jul 18 17:01:24 2016 -0700

----------------------------------------------------------------------
 .../planner/sql/HivePartitionDescriptor.java    |   3 +-
 .../drill/exec/physical/base/FileGroupScan.java |   1 +
 .../planner/AbstractPartitionDescriptor.java    |  14 ++
 .../exec/planner/DFSDirPartitionLocation.java   |  12 ++
 .../exec/planner/DFSFilePartitionLocation.java  |   7 +-
 .../planner/FileSystemPartitionDescriptor.java  |  57 ++++++--
 .../planner/ParquetPartitionDescriptor.java     |  21 ++-
 .../drill/exec/planner/PartitionDescriptor.java |  20 ++-
 .../drill/exec/planner/PartitionLocation.java   |   6 +
 .../exec/planner/SimplePartitionLocation.java   |   5 +
 .../partition/FindPartitionConditions.java      |  12 ++
 .../logical/partition/PruneScanRule.java        | 114 +++++++++++++--
 .../drill/exec/store/dfs/DrillFileSystem.java   |   1 -
 .../drill/exec/store/dfs/FileSelection.java     | 120 ++++++++++++++--
 .../drill/exec/store/parquet/Metadata.java      |  94 ++++++++++++-
 .../exec/store/parquet/ParquetFormatPlugin.java |  21 ++-
 .../exec/store/parquet/ParquetGroupScan.java    |  85 +++++++-----
 .../store/parquet/TestParquetMetadataCache.java | 139 ++++++++++++++++---
 .../multilevel/parquet2/1994/Q1/1.parquet       | Bin 0 -> 2015 bytes
 .../multilevel/parquet2/1994/Q1/2.parquet       | Bin 0 -> 2015 bytes
 .../multilevel/parquet2/1994/Q2/1.parquet       | Bin 0 -> 2130 bytes
 .../multilevel/parquet2/1994/Q2/2.parquet       | Bin 0 -> 2130 bytes
 .../multilevel/parquet2/1994/Q3/1.parquet       | Bin 0 -> 2054 bytes
 .../multilevel/parquet2/1994/Q3/2.parquet       | Bin 0 -> 2054 bytes
 .../multilevel/parquet2/1994/Q4/1.parquet       | Bin 0 -> 2056 bytes
 .../multilevel/parquet2/1994/Q4/2.parquet       | Bin 0 -> 2056 bytes
 .../multilevel/parquet2/1995/Q1/1.parquet       | Bin 0 -> 2180 bytes
 .../multilevel/parquet2/1995/Q1/2.parquet       | Bin 0 -> 2180 bytes
 .../multilevel/parquet2/1995/Q2/1.parquet       | Bin 0 -> 2110 bytes
 .../multilevel/parquet2/1995/Q2/2.parquet       | Bin 0 -> 2110 bytes
 .../multilevel/parquet2/1995/Q3/1.parquet       | Bin 0 -> 1902 bytes
 .../multilevel/parquet2/1995/Q3/2.parquet       | Bin 0 -> 1902 bytes
 .../multilevel/parquet2/1995/Q4/1.parquet       | Bin 0 -> 2013 bytes
 .../multilevel/parquet2/1995/Q4/2.parquet       | Bin 0 -> 2013 bytes
 34 files changed, 627 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index c8e45ca..d42aea7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -86,6 +86,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     return numPartitionLevels;
   }
 
+  @Override
   public String getBaseTableLocation() {
     HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry;
     return origEntry.table.getTable().getSd().getLocation();
@@ -151,7 +152,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitions, boolean wasAllPartitionsPruned /* ignored */) throws Exception {
     GroupScan newGroupScan = createNewGroupScan(newPartitions);
     return new DrillScanRel(scanRel.getCluster(),
         scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
index 552d1e8..9d4767e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileGroupScan.java
@@ -26,4 +26,5 @@ public interface FileGroupScan extends GroupScan {
   public void modifyFileSelection(FileSelection selection);
 
   public FileGroupScan clone(FileSelection selection) throws IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index c9ca448..9879492 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.planner;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rel.core.TableScan;
+
 /**
  * Abstract base class for file system based partition descriptors and Hive partition descriptors.
  *
@@ -55,4 +57,16 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
     return locationSuperList.iterator();
   }
 
+  @Override
+  public boolean supportsSinglePartOptimization() {
+    return false;
+  }
+
+
+  @Override
+  public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+      boolean isAllPruned) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
index da3aa68..a4d2b81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
@@ -67,4 +67,16 @@ public class DFSDirPartitionLocation implements PartitionLocation {
     return true;
   }
 
+  @Override
+  public String getCompositePartitionPath() {
+    String path = "";
+    for (int i=0; i < dirs.length; i++) {
+      if (dirs[i] == null) { // get the prefix
+        break;
+      }
+      path += "/" + dirs[i];
+    }
+    return path;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
index 6e42f3b..cac5d93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
@@ -26,7 +26,7 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation {
   private final String[] dirs;
   private final String file;
 
-  public DFSFilePartitionLocation(int max, String selectionRoot, String file) {
+  public DFSFilePartitionLocation(int max, String selectionRoot, String file, boolean hasDirsOnly) {
     this.file = file;
     this.dirs = new String[max];
 
@@ -42,8 +42,8 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation {
       postPath = postPath.substring(1);
     }
     String[] mostDirs = postPath.split("/");
-    int maxLoop = Math.min(max, mostDirs.length - 1);
-    for(int i =0; i < maxLoop; i++){
+    int maxLoop = Math.min(max, hasDirsOnly ? mostDirs.length : mostDirs.length - 1);
+    for(int i =0; i < maxLoop; i++) {
       this.dirs[i] = mostDirs[i];
     }
   }
@@ -71,5 +71,6 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation {
   public String[] getDirs() {
     return dirs;
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index cfc8542..ba18bbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -35,6 +35,7 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -139,15 +140,17 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     return partitionLabel + index;
   }
 
-  private String getBaseTableLocation() {
+  @Override
+  public String getBaseTableLocation() {
     final FormatSelection origSelection = (FormatSelection) table.getSelection();
     return origSelection.getSelection().selectionRoot;
   }
 
   @Override
   protected void createPartitionSublists() {
-    final Collection<String> fileLocations = getFileLocations();
+    final Pair<Collection<String>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
     List<PartitionLocation> locations = new LinkedList<>();
+    boolean hasDirsOnly = fileLocationsAndStatus.right;
 
     final String selectionRoot = getBaseTableLocation();
 
@@ -159,8 +162,8 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
 
     // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation)
     // it contains.
-    for (String file: fileLocations) {
-      DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file);
+    for (String file: fileLocationsAndStatus.left) {
+      DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly);
 
       final String[] dirs = dfsFilePartitionLocation.getDirs();
       final List<String> dirList = Arrays.asList(dirs);
@@ -180,25 +183,34 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     sublistsCreated = true;
   }
 
-  protected Collection<String> getFileLocations() {
+  protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {
     Collection<String> fileLocations = null;
+    Pair<Collection<String>, Boolean> fileLocationsAndStatus = null;
+    boolean isExpandedPartial = false;
     if (scanRel instanceof DrillScanRel) {
       // If a particular GroupScan provides files, get the list of files from there rather than
       // DrillTable because GroupScan would have the updated version of the selection
       final DrillScanRel drillScan = (DrillScanRel) scanRel;
       if (drillScan.getGroupScan().hasFiles()) {
         fileLocations = drillScan.getGroupScan().getFiles();
+        isExpandedPartial = false;
       } else {
-        fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
+        FileSelection selection = ((FormatSelection) table.getSelection()).getSelection();
+        fileLocations = selection.getFiles();
+        isExpandedPartial = selection.isExpandedPartial();
       }
     } else if (scanRel instanceof EnumerableTableScan) {
-      fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
+      FileSelection selection = ((FormatSelection) table.getSelection()).getSelection();
+      fileLocations = selection.getFiles();
+      isExpandedPartial = selection.isExpandedPartial();
     }
-    return fileLocations;
+    fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial);
+    return fileLocationsAndStatus;
   }
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
+      boolean wasAllPartitionsPruned) throws Exception {
     List<String> newFiles = Lists.newArrayList();
     for (final PartitionLocation location : newPartitionLocation) {
       if (!location.isCompositePartition()) {
@@ -212,8 +224,11 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     }
 
     if (scanRel instanceof DrillScanRel) {
-      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
-      final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+      final FormatSelection formatSelection = (FormatSelection)table.getSelection();
+      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
+          cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+      final FileGroupScan newGroupScan =
+          ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
       return new DrillScanRel(scanRel.getCluster(),
                       scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
                       scanRel.getTable(),
@@ -222,16 +237,19 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
                       ((DrillScanRel) scanRel).getColumns(),
                       true /*filter pushdown*/);
     } else if (scanRel instanceof EnumerableTableScan) {
-      return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles);
+      return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
+          wasAllPartitionsPruned);
     } else {
       throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
     }
   }
 
-  private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles) {
+  private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
+      boolean wasAllPartitionsPruned) {
     final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
     final FormatSelection formatSelection = (FormatSelection) table.getSelection();
-    final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
+    final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
+            cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
     final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
     final DrillTranslatableTable newTable = new DrillTranslatableTable(
             new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
@@ -242,4 +260,15 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
     return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl);
   }
 
+  @Override
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
+      boolean wasAllPartitionsPruned) throws Exception {
+    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+  }
+
+  @Override
+  public boolean supportsSinglePartOptimization() {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index 07e1412..2c8ca95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -80,8 +80,10 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
     return partitionColumns.size();
   }
 
-  private GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
-    final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
+  private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot,
+      boolean wasAllPartitionsPruned) throws IOException {
+    final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(),
+        cacheFileRoot, wasAllPartitionsPruned);
     final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
     return newScan;
   }
@@ -113,7 +115,8 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
     return ((ParquetGroupScan) scanRel.getGroupScan()).getTypeForColumn(column);
   }
 
-  private String getBaseTableLocation() {
+  @Override
+  public String getBaseTableLocation() {
     final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
     return origSelection.getSelection().selectionRoot;
   }
@@ -130,13 +133,14 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws Exception {
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
+      boolean wasAllPartitionsPruned) throws Exception {
     List<String> newFiles = Lists.newArrayList();
     for (final PartitionLocation location : newPartitionLocation) {
       newFiles.add(location.getEntirePartitionLocation());
     }
 
-    final GroupScan newGroupScan = createNewGroupScan(newFiles);
+    final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned);
 
     return new DrillScanRel(scanRel.getCluster(),
         scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
@@ -146,4 +150,11 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
         scanRel.getColumns(),
         true /*filter pushdown*/);
   }
+
+  @Override
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
+      boolean wasAllPartitionsPruned) throws Exception {
+    return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index f08d713..4d1bfdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -77,9 +77,27 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
   /**
    * Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
    * @param newPartitions
+   * @param wasAllPartitionsPruned
    * @return
    * @throws Exception
    */
-  public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception;
+  public TableScan createTableScan(List<PartitionLocation> newPartitions,
+      boolean wasAllPartitionsPruned) throws Exception;
+
+  /**
+   * Create a new TableScan rel node, given the lists of new partitions or new files to scan and a path
+   * to a metadata cache file
+   * @param newPartitions
+   * @param cacheFileRoot
+   * @param wasAllPartitionsPruned
+   * @return
+   * @throws Exception
+   */
+  public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+      boolean wasAllPartitionsPruned) throws Exception;
+
+  public boolean supportsSinglePartOptimization();
+
+  public String getBaseTableLocation();
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
index f94e8cb..b6396b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
@@ -54,4 +54,10 @@ public interface PartitionLocation {
    * Returns if this is a simple or composite partition.
    */
   public boolean isCompositePartition();
+
+  /**
+   * Returns the path string of directory names only for composite partition
+   */
+  public String getCompositePartitionPath();
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
index 523169e..7c4c22f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
@@ -34,6 +34,11 @@ public abstract  class SimplePartitionLocation implements PartitionLocation{
   }
 
   @Override
+  public String getCompositePartitionPath() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public List<SimplePartitionLocation> getPartitionLocationRecursive() {
     return ImmutableList.of(this);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
index d1446b6..620b6b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
@@ -84,6 +84,10 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
 
   private final BitSet dirs;
 
+  // The Scan could be projecting several dirN columns but we are only interested in the
+  // ones that are referenced by the Filter, so keep track of such referenced dirN columns.
+  private final BitSet referencedDirs;
+
   private final List<PushDirFilter> pushStatusStack =  Lists.newArrayList();
   private final Deque<OpState> opStack = new ArrayDeque<OpState>();
 
@@ -103,6 +107,7 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
     // go deep
     super(true);
     this.dirs = dirs;
+    this.referencedDirs = new BitSet(dirs.size());
   }
 
   public FindPartitionConditions(BitSet dirs, RexBuilder builder) {
@@ -110,6 +115,7 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
     super(true);
     this.dirs = dirs;
     this.builder = builder;
+    this.referencedDirs = new BitSet(dirs.size());
   }
 
   public void analyze(RexNode exp) {
@@ -131,6 +137,10 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
     return resultCondition;
   }
 
+  public BitSet getReferencedDirs() {
+    return referencedDirs;
+  }
+
   private Void pushVariable() {
     pushStatusStack.add(PushDirFilter.NO_PUSH);
     return null;
@@ -222,6 +232,8 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
     if(dirs.get(inputRef.getIndex())){
       pushStatusStack.add(PushDirFilter.PUSH);
       addResult(inputRef);
+      referencedDirs.set(inputRef.getIndex());
+
     }else{
       pushStatusStack.add(PushDirFilter.NO_PUSH);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index a9fb101..209e03d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.SimplePartitionLocation;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -60,7 +61,6 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
@@ -68,6 +68,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rex.RexNode;
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -143,6 +144,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
   }
 
   protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
+
     final String pruningClassName = getClass().getName();
     logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
     Stopwatch totalPruningTime = Stopwatch.createStarted();
@@ -166,6 +168,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     List<String> fieldNames = scanRel.getRowType().getFieldNames();
     BitSet columnBitset = new BitSet();
     BitSet partitionColumnBitSet = new BitSet();
+    Map<Integer, Integer> partitionMap = Maps.newHashMap();
 
     int relColIndex = 0;
     for (String field : fieldNames) {
@@ -174,6 +177,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
         fieldNameMap.put(partitionIndex, field);
         partitionColumnBitSet.set(partitionIndex);
         columnBitset.set(relColIndex);
+        // mapping between the relColIndex and partitionIndex
+        partitionMap.put(relColIndex, partitionIndex);
       }
       relColIndex++;
     }
@@ -193,6 +198,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
     c.analyze(condition);
     RexNode pruneCondition = c.getFinalCondition();
+    BitSet referencedDirsBitSet = c.getReferencedDirs();
 
     logger.info("Total elapsed time to build and analyze filter tree: {} ms",
         miscTimer.elapsed(TimeUnit.MILLISECONDS));
@@ -210,6 +216,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     int batchIndex = 0;
     PartitionLocation firstLocation = null;
     LogicalExpression materializedExpr = null;
+    boolean checkForSingle = descriptor.supportsSinglePartOptimization();
+    boolean isSinglePartition = true;
+    String[] spInfo = null;
+    int maxIndex = -1;
 
     // Outer loop: iterate over a list of batches of PartitionLocations
     for (List<PartitionLocation> partitions : descriptor) {
@@ -269,13 +279,59 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
         int recordCount = 0;
         int qualifiedCount = 0;
 
-        // Inner loop: within each batch iterate over the PartitionLocations
-        for(PartitionLocation part: partitions){
-          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
-            newPartitions.add(part);
-            qualifiedCount++;
+        if (checkForSingle &&
+            partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
+          // Inner loop: within each batch iterate over the PartitionLocations
+          for (PartitionLocation part : partitions) {
+            assert part.isCompositePartition();
+            if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+              newPartitions.add(part);
+              if (isSinglePartition) { // only need to do this if we are already single partition
+                // compose the array of partition values for the directories that are referenced by filter:
+                // e.g suppose the dir hierarchy is year/quarter/month and the query is:
+                //     SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
+                // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
+                // Note that we are not using the PartitionLocation here but composing a different list because
+                // we are only interested in the directory columns that are referenced in the filter condition. not
+                // the SELECT list or other parts of the query.
+                Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
+                String[] parts = p.getLeft();
+                int tmpIndex = p.getRight();
+                if (spInfo == null) {
+                  for (int j = 0; j <= tmpIndex; j++) {
+                    if (parts[j] == null) { // prefixes should be non-null
+                      isSinglePartition = false;
+                      break;
+                    }
+                  }
+                  spInfo = parts;
+                  maxIndex = tmpIndex;
+                } else if (maxIndex != tmpIndex) {
+                  isSinglePartition = false;
+                  break;
+                } else {
+                  // we only want to compare until the maxIndex inclusive since subsequent values would be null
+                  for (int j = 0; j <= maxIndex; j++) {
+                    if (!spInfo[j].equals(parts[j])) {
+                      isSinglePartition = false;
+                      break;
+                    }
+                  }
+                }
+              }
+              qualifiedCount++;
+            }
+            recordCount++;
+          }
+        } else {
+          // Inner loop: within each batch iterate over the PartitionLocations
+          for(PartitionLocation part: partitions){
+            if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+              newPartitions.add(part);
+              qualifiedCount++;
+            }
+            recordCount++;
           }
-          recordCount++;
         }
         logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
         batchIndex++;
@@ -299,6 +355,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
 
       // handle the case all partitions are filtered out.
       boolean canDropFilter = true;
+      boolean wasAllPartitionsPruned = false;
+      String cacheFileRoot = null;
 
       if (newPartitions.isEmpty()) {
         assert firstLocation != null;
@@ -306,6 +364,16 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
         // In such case, we should not drop filter.
         newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
         canDropFilter = false;
+        // NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of
+        // directories first and the non-composite partition location will still return
+        // directories, not files.  So, additional processing is done depending on this flag
+        wasAllPartitionsPruned = true;
+        logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);
+
+        // set the cacheFileRoot appropriately
+        if (firstLocation.isCompositePartition()) {
+          cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
+        }
       }
 
       logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
@@ -320,7 +388,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
       condition = condition.accept(reverseVisitor);
       pruneCondition = pruneCondition.accept(reverseVisitor);
 
-      RelNode inputRel = descriptor.createTableScan(newPartitions);
+      if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
+        // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
+        String path = "";
+        for (int j = 0; j <= maxIndex; j++) {
+          path += "/" + spInfo[j];
+        }
+        cacheFileRoot = descriptor.getBaseTableLocation() + path;
+      }
+
+      RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
+          descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
+            descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
 
       if (projectRel != null) {
         inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -340,6 +419,25 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     }
   }
 
+  private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet,
+      Map<Integer, Integer> partitionMap,
+      ValueVector[] vectors,
+      int recordCount) {
+    String[] partition = new String[vectors.length];
+    int maxIndex = -1;
+    for (int referencedDirsIndex : BitSets.toIter(referencedDirsBitSet)) {
+      int partitionColumnIndex = partitionMap.get(referencedDirsIndex);
+      ValueVector vv = vectors[partitionColumnIndex];
+      if (vv.getAccessor().getValueCount() > 0 &&
+          vv.getAccessor().getObject(recordCount) != null) {
+        String value = vv.getAccessor().getObject(recordCount).toString();
+        partition[partitionColumnIndex] = value;
+        maxIndex = Math.max(maxIndex, partitionColumnIndex);
+      }
+    }
+    return Pair.of(partition, maxIndex);
+  }
+
   protected LogicalExpression materializePruneExpr(RexNode pruneCondition,
       PlannerSettings settings,
       RelNode scanRel,

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index b6e767e..e03cf22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -760,7 +760,6 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     }
   }
 
-
   private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
     if (parent.isDir()) {
       Path pattern = new Path(parent.getPath(), "*");

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 5b4813a..d357c39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -32,6 +31,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -47,16 +47,28 @@ public class FileSelection {
   private List<FileStatus> statuses;
 
   public List<String> files;
+  /**
+   * root path for the selections
+   */
   public final String selectionRoot;
+  /**
+   * root path for the metadata cache file (if any)
+   */
+  public final String cacheFileRoot;
 
   private enum StatusType {
     NOT_CHECKED,         // initial state
     NO_DIRS,             // no directories in this selection
     HAS_DIRS,            // directories were found in the selection
-    EXPANDED             // whether this selection has been expanded to files
+    EXPANDED_FULLY,      // whether selection fully expanded to files
+    EXPANDED_PARTIAL     // whether selection partially expanded to only directories (not files)
   }
 
   private StatusType dirStatus;
+  // whether this selection previously had a wildcard
+  private boolean hadWildcard = false;
+  // whether all partitions were previously pruned for this selection
+  private boolean wasAllPartitionsPruned = false;
 
   /**
    * Creates a {@link FileSelection selection} out of given file statuses/files and selection root.
@@ -66,10 +78,22 @@ public class FileSelection {
    * @param selectionRoot  root path for selections
    */
   public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+    this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED);
+  }
+
+  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
+      final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
+    this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED);
+  }
+
+  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
+      final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) {
     this.statuses = statuses;
     this.files = files;
     this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
-    this.dirStatus = StatusType.NOT_CHECKED;
+    this.dirStatus = dirStatus;
+    this.cacheFileRoot = cacheFileRoot;
+    this.wasAllPartitionsPruned = wasAllPartitionsPruned;
   }
 
   /**
@@ -81,6 +105,9 @@ public class FileSelection {
     this.files = selection.files;
     this.selectionRoot = selection.selectionRoot;
     this.dirStatus = selection.dirStatus;
+    this.cacheFileRoot = selection.cacheFileRoot;
+    this.hadWildcard = selection.hadWildcard;
+    this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
   }
 
   public String getSelectionRoot() {
@@ -128,7 +155,7 @@ public class FileSelection {
   }
 
   public FileSelection minusDirectories(DrillFileSystem fs) throws IOException {
-    if (isExpanded()) {
+    if (isExpandedFully()) {
       return this;
     }
     Stopwatch timer = Stopwatch.createStarted();
@@ -152,7 +179,7 @@ public class FileSelection {
 
     // fileSel will be null if we query an empty folder
     if (fileSel != null) {
-      fileSel.setExpanded();
+      fileSel.setExpandedFully();
     }
 
     return fileSel;
@@ -162,12 +189,28 @@ public class FileSelection {
     return getStatuses(fs).get(0);
   }
 
-  public void setExpanded() {
-    this.dirStatus = StatusType.EXPANDED;
+  public void setExpandedFully() {
+    this.dirStatus = StatusType.EXPANDED_FULLY;
+  }
+
+  public boolean isExpandedFully() {
+    return dirStatus == StatusType.EXPANDED_FULLY;
+  }
+
+  public void setExpandedPartial() {
+    this.dirStatus = StatusType.EXPANDED_PARTIAL;
+  }
+
+  public boolean isExpandedPartial() {
+    return dirStatus == StatusType.EXPANDED_PARTIAL;
   }
 
-  public boolean isExpanded() {
-    return dirStatus == StatusType.EXPANDED;
+  public StatusType getDirStatus() {
+    return dirStatus;
+  }
+
+  public boolean wasAllPartitionsPruned() {
+    return this.wasAllPartitionsPruned;
   }
 
   private static String commonPath(final List<FileStatus> statuses) {
@@ -229,13 +272,16 @@ public class FileSelection {
 
   public static FileSelection create(final DrillFileSystem fs, final String parent, final String path) throws IOException {
     Stopwatch timer = Stopwatch.createStarted();
+    boolean hasWildcard = path.contains(WILD_CARD);
+
     final Path combined = new Path(parent, removeLeadingSlash(path));
-    final FileStatus[] statuses = fs.globStatus(combined);
+    final FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards
     if (statuses == null) {
       return null;
     }
     final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
     logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
+    fileSel.setHadWildcard(hasWildcard);
     return fileSel;
 
   }
@@ -246,13 +292,14 @@ public class FileSelection {
    * @param statuses  list of file statuses
    * @param files  list of files
    * @param root  root path for selections
-   *
+   * @param cacheFileRoot root path for metadata cache (null for no metadata cache)
    * @return  null if creation of {@link FileSelection} fails with an {@link IllegalArgumentException}
    *          otherwise a new selection.
    *
    * @see FileSelection#FileSelection(List, List, String)
    */
-  public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) {
+  public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root,
+      final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
     final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
     final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);
 
@@ -272,7 +319,39 @@ public class FileSelection {
       final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
       selectionRoot = path.toString();
     }
-    return new FileSelection(statuses, files, selectionRoot);
+    return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned);
+  }
+
+  public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) {
+    return FileSelection.create(statuses, files, root, null, false);
+  }
+
+  public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection) {
+    final String root = selection.getSelectionRoot();
+    if (Strings.isNullOrEmpty(root)) {
+      throw new DrillRuntimeException("Selection root is null or empty" + root);
+    }
+    if (dirPaths == null || dirPaths.isEmpty()) {
+      throw new DrillRuntimeException("List of directories is null or empty");
+    }
+
+    List<String> dirs = Lists.newArrayList();
+
+    if (selection.hadWildcard()) { // for wildcard the directory list should have already been expanded
+      for (FileStatus status : selection.getFileStatuses()) {
+        dirs.add(status.getPath().toString());
+      }
+    } else {
+      for (String s : dirPaths) {
+        dirs.add(s);
+      }
+    }
+
+    final Path rootPath = handleWildCard(root);
+    // final URI uri = dirPaths.get(0).toUri();
+    final URI uri = selection.getFileStatuses().get(0).getPath().toUri();
+    final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
+    return new FileSelection(null, dirs, path.toString());
   }
 
   private static Path handleWildCard(final String root) {
@@ -300,7 +379,20 @@ public class FileSelection {
   }
 
   public boolean supportDirPrunig() {
-    return isExpanded(); // currently we only support pruning if the directories have been expanded (this may change in the future)
+    if (isExpandedFully() || isExpandedPartial()) {
+      if (!wasAllPartitionsPruned) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void setHadWildcard(boolean wc) {
+    this.hadWildcard = wc;
+  }
+
+  public boolean hadWildcard() {
+    return this.hadWildcard;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index d7d31e5..45f7ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -48,6 +48,7 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.codehaus.jackson.annotate.JsonIgnore;
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -76,6 +77,7 @@ public class Metadata {
 
   public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
+  public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
 
   private final FileSystem fs;
 
@@ -132,6 +134,11 @@ public class Metadata {
     return metadata.readBlockMeta(path);
   }
 
+  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path) throws IOException {
+    Metadata metadata = new Metadata(fs);
+    return metadata.readMetadataDirs(path);
+  }
+
   private Metadata(FileSystem fs) {
     this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
   }
@@ -142,7 +149,8 @@ public class Metadata {
    * @param path
    * @throws IOException
    */
-  private ParquetTableMetadata_v2 createMetaFilesRecursively(final String path) throws IOException {
+  private Pair<ParquetTableMetadata_v2, ParquetTableMetadataDirs>
+  createMetaFilesRecursively(final String path) throws IOException {
     List<ParquetFileMetadata_v2> metaDataList = Lists.newArrayList();
     List<String> directoryList = Lists.newArrayList();
     ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfoSet =
@@ -155,7 +163,7 @@ public class Metadata {
 
     for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
       if (file.isDirectory()) {
-        ParquetTableMetadata_v2 subTableMetadata = createMetaFilesRecursively(file.getPath().toString());
+        ParquetTableMetadata_v2 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString())).getLeft();
         metaDataList.addAll(subTableMetadata.files);
         directoryList.addAll(subTableMetadata.directories);
         directoryList.add(file.getPath().toString());
@@ -187,7 +195,14 @@ public class Metadata {
       fs.delete(new Path(p, oldname), false);
     }
     writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
-    return parquetTableMetadata;
+
+    if (directoryList.size() > 0 && childFiles.size() == 0) {
+      ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
+      writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
+      return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
+    }
+    List<String> emptyDirList = Lists.newArrayList();
+    return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(emptyDirList));
   }
 
   /**
@@ -418,6 +433,19 @@ public class Metadata {
     os.close();
   }
 
+  private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, Path p) throws IOException {
+    JsonFactory jsonFactory = new JsonFactory();
+    jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
+    jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+    ObjectMapper mapper = new ObjectMapper(jsonFactory);
+    SimpleModule module = new SimpleModule();
+    mapper.registerModule(module);
+    FSDataOutputStream os = fs.create(p);
+    mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
+    os.flush();
+    os.close();
+  }
+
   /**
    * Read the parquet metadata from a file
    *
@@ -447,11 +475,38 @@ public class Metadata {
     timer.stop();
     if (tableModified(parquetTableMetadata, p)) {
       parquetTableMetadata =
-          createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString());
+          (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
     }
     return parquetTableMetadata;
   }
 
+  private ParquetTableMetadataDirs readMetadataDirs(String path) throws IOException {
+    Stopwatch timer = Stopwatch.createStarted();
+    Path p = new Path(path);
+    ObjectMapper mapper = new ObjectMapper();
+
+    final SimpleModule serialModule = new SimpleModule();
+    serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
+
+    AfterburnerModule module = new AfterburnerModule();
+    module.setUseOptimizedBeanDeserializer(true);
+
+    mapper.registerModule(serialModule);
+    mapper.registerModule(module);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    FSDataInputStream is = fs.open(p);
+
+    ParquetTableMetadataDirs parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
+    logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+    timer.stop();
+
+    if (tableModified(parquetTableMetadataDirs, p)) {
+      parquetTableMetadataDirs =
+          (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
+    }
+    return parquetTableMetadataDirs;
+  }
+
   /**
    * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
    * the modification time of the metadata file
@@ -477,6 +532,22 @@ public class Metadata {
     return false;
   }
 
+  private boolean tableModified(ParquetTableMetadataDirs tableMetadataDirs, Path metaFilePath)
+      throws IOException {
+    long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
+    FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
+    if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+      return true;
+    }
+    for (String directory : tableMetadataDirs.getDirectories()) {
+      directoryStatus = fs.getFileStatus(new Path(directory));
+      if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version")
   @JsonSubTypes({
       @JsonSubTypes.Type(value = ParquetTableMetadata_v1.class, name="v1"),
@@ -535,7 +606,22 @@ public class Metadata {
     public abstract OriginalType getOriginalType();
   }
 
+  public static class ParquetTableMetadataDirs {
+    @JsonProperty List<String> directories;
+
+    public ParquetTableMetadataDirs() {
+      // default constructor needed for deserialization
+    }
 
+    public ParquetTableMetadataDirs(List<String> directories) {
+      this.directories = directories;
+    }
+
+    @JsonIgnore public List<String> getDirectories() {
+      return directories;
+    }
+
+  }
 
   @JsonTypeName("v1")
   public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase {

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index bf2e797..20c7312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -50,6 +50,7 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
 import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataDirs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -164,7 +165,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
   @Override
   public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
       throws IOException {
-    return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, columns);
+    return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, selection.cacheFileRoot, columns);
   }
 
   @Override
@@ -207,9 +208,21 @@ public class ParquetFormatPlugin implements FormatPlugin{
     public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
         FileSystemPlugin fsPlugin, String storageEngineName, String userName)
         throws IOException {
-      // TODO: we only check the first file for directory reading.
-      if(selection.containsDirectories(fs)){
-        if(isDirReadable(fs, selection.getFirstPath(fs))){
+      if(selection.containsDirectories(fs)) {
+        Path dirMetaPath = new Path(selection.getSelectionRoot(), Metadata.METADATA_DIRECTORIES_FILENAME);
+        // check if the metadata 'directories' file exists; if it does, there is an implicit assumption that
+        // the directory is readable since the metadata 'directories' file cannot be created otherwise.  Note
+        // that isDirReadable() does a similar check with the metadata 'cache' file.
+        if (fs.exists(dirMetaPath)) {
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString());
+          if (mDirs.getDirectories().size() > 0) {
+            FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection);
+            dirSelection.setExpandedPartial();
+            return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
+                new FormatSelection(plugin.getConfig(), dirSelection));
+          }
+        }
+        if(isDirReadable(fs, selection.getFirstPath(fs))) {
           return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
               new FormatSelection(plugin.getConfig(), selection));
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 5950b74..b838472 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -111,12 +111,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private List<SchemaPath> columns;
   private ListMultimap<Integer, RowGroupInfo> mappings;
   private List<RowGroupInfo> rowGroupInfos;
-  /**
-   * The parquet table metadata may have already been read
-   * from a metadata cache file earlier; we can re-use during
-   * the ParquetGroupScan and avoid extra loading time.
-   */
   private Metadata.ParquetTableMetadataBase parquetTableMetadata = null;
+  private String cacheFileRoot = null;
 
   /*
    * total number of rows (obtained from parquet footer)
@@ -135,7 +131,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       @JsonProperty("format") FormatPluginConfig formatConfig, //
       @JacksonInject StoragePluginRegistry engineRegistry, //
       @JsonProperty("columns") List<SchemaPath> columns, //
-      @JsonProperty("selectionRoot") String selectionRoot //
+      @JsonProperty("selectionRoot") String selectionRoot, //
+      @JsonProperty("cacheFileRoot") String cacheFileRoot //
   ) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName));
     this.columns = columns;
@@ -150,6 +147,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.selectionRoot = selectionRoot;
+    this.cacheFileRoot = cacheFileRoot;
 
     init();
   }
@@ -159,6 +157,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       FileSelection selection, //
       ParquetFormatPlugin formatPlugin, //
       String selectionRoot,
+      String cacheFileRoot,
       List<SchemaPath> columns) //
       throws IOException {
     super(userName);
@@ -168,13 +167,13 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf());
 
     this.selectionRoot = selectionRoot;
+    this.cacheFileRoot = cacheFileRoot;
 
     final FileSelection fileSelection = expandIfNecessary(selection);
 
     this.entries = Lists.newArrayList();
-    final List<FileStatus> files = fileSelection.getStatuses(fs);
-    for (FileStatus file : files) {
-      entries.add(new ReadEntryWithPath(file.getPath().toString()));
+    for (String fileName : fileSelection.getFiles()) {
+      entries.add(new ReadEntryWithPath(fileName));
     }
 
     init();
@@ -201,6 +200,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
     this.usedMetadataCache = that.usedMetadataCache;
     this.parquetTableMetadata = that.parquetTableMetadata;
+    this.cacheFileRoot = that.cacheFileRoot;
   }
 
   /**
@@ -213,16 +213,18 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * @throws IOException
    */
   private FileSelection expandIfNecessary(FileSelection selection) throws IOException {
-    if (selection.isExpanded()) {
+    if (selection.isExpandedFully()) {
       return selection;
     }
 
-    Path metaFilePath = new Path(selection.getSelectionRoot(), Metadata.METADATA_FILENAME);
+    // use the cacheFileRoot if provided (e.g after partition pruning)
+    Path metaFilePath = new Path(cacheFileRoot != null ? cacheFileRoot : selectionRoot, Metadata.METADATA_FILENAME);
     if (!fs.exists(metaFilePath)) { // no metadata cache
       return selection;
     }
 
-    return initFromMetadataCache(selection, metaFilePath);
+    FileSelection expandedSelection = initFromMetadataCache(selection, metaFilePath);
+    return expandedSelection;
   }
 
   public List<ReadEntryWithPath> getEntries() {
@@ -549,7 +551,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
   }
 
-
   /**
    * Create and return a new file selection based on reading the metadata cache file.
    *
@@ -570,16 +571,29 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     // get (and set internal field) the metadata for the directory by reading the metadata file
     this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
-    List<String> fileNames = Lists.newArrayList();
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
+    if (fileSet == null) {
+      fileSet = Sets.newHashSet();
+    }
+
     final Path first = fileStatuses.get(0).getPath();
     if (fileStatuses.size() == 1 && selection.getSelectionRoot().equals(first.toString())) {
       // we are selecting all files from selection root. Expand the file list from the cache
       for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
-        fileNames.add(file.getPath());
+        fileSet.add(file.getPath());
+      }
+
+    } else if (selection.isExpandedPartial() && cacheFileRoot != null) {
+      this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
+      if (selection.wasAllPartitionsPruned()) {
+        // if all partitions were previously pruned, we only need to read 1 file (for the schema)
+        fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
+      } else {
+        for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
+          fileSet.add(file.getPath());
+        }
       }
-      // we don't need to populate fileSet as all files are selected
     } else {
       // we need to expand the files from fileStatuses
       for (FileStatus status : fileStatuses) {
@@ -588,25 +602,24 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
           final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME);
           final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString());
           for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
-            fileNames.add(file.getPath());
+            fileSet.add(file.getPath());
           }
         } else {
           final Path path = Path.getPathWithoutSchemeAndAuthority(status.getPath());
-          fileNames.add(path.toString());
+          fileSet.add(path.toString());
         }
       }
-
-      // populate fileSet so we only keep the selected row groups
-      fileSet = Sets.newHashSet(fileNames);
     }
 
-    if (fileNames.isEmpty()) {
+    if (fileSet.isEmpty()) {
       // no files were found, most likely we tried to query some empty sub folders
       throw UserException.validationError().message("The table you tried to query is empty").build(logger);
     }
 
-    // when creating the file selection, set the selection root in the form /a/b instead of
-    // file:/a/b.  The reason is that the file names above have been created in the form
+    List<String> fileNames = Lists.newArrayList(fileSet);
+
+    // when creating the file selection, set the selection root without the URI prefix
+    // The reason is that the file names above have been created in the form
     // /a/b/c.parquet and the format of the selection root must match that of the file names
     // otherwise downstream operations such as partition pruning can break.
     final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot()));
@@ -616,14 +629,15 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     // because create() changes the root to include the scheme and authority; In future, if create()
     // is the preferred way to instantiate a file selection, we may need to do something different...
     // WARNING: file statuses and file names are inconsistent
-    FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString());
+    FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(),
+        cacheFileRoot, selection.wasAllPartitionsPruned());
 
-    newSelection.setExpanded();
+    newSelection.setExpandedFully();
     return newSelection;
   }
 
   private void init() throws IOException {
-    if (entries.size() == 1) {
+    if (entries.size() == 1 && parquetTableMetadata == null) {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
       Path metaPath = null;
       if (fs.isDirectory(p)) {
@@ -633,9 +647,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       }
       if (metaPath != null && fs.exists(metaPath)) {
         usedMetadataCache = true;
-        if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
-        }
+        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
       } else {
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
       }
@@ -837,10 +849,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
   @Override
   public String toString() {
+    String cacheFileString = "";
+    if (usedMetadataCache) {
+      // For EXPLAIN, remove the URI prefix from cacheFileRoot.  If cacheFileRoot is null, we
+      // would have read the cache file from selectionRoot
+      String str = (cacheFileRoot == null) ?
+          Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() :
+            Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString();
+      cacheFileString = ", cacheFileRoot=" + str;
+    }
     return "ParquetGroupScan [entries=" + entries
         + ", selectionRoot=" + selectionRoot
         + ", numFiles=" + getEntries().size()
         + ", usedMetadataFile=" + usedMetadataCache
+        + cacheFileString
         + ", columns=" + columns + "]";
   }
 
@@ -855,6 +877,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   public FileGroupScan clone(FileSelection selection) throws IOException {
     ParquetGroupScan newScan = new ParquetGroupScan(this);
     newScan.modifyFileSelection(selection);
+    newScan.cacheFileRoot = selection.cacheFileRoot;
     newScan.init();
     return newScan;
   }
@@ -893,7 +916,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
 
     try {
-      FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot());
+      FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false);
       logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size());
       return this.clone(newSelection);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index afcea87..dae8694 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -34,43 +34,53 @@ import static org.junit.Assert.assertEquals;
 public class TestParquetMetadataCache extends PlanTestBase {
   private static final String WORKING_PATH = TestTools.getWorkingPath();
   private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-  private static final String tableName = "parquetTable";
+  private static final String tableName1 = "parquetTable1";
+  private static final String tableName2 = "parquetTable2";
 
 
   @BeforeClass
   public static void copyData() throws Exception {
     // copy the data into the temporary location
     String tmpLocation = getDfsTestTmpSchemaLocation();
-    File dataDir = new File(tmpLocation + Path.SEPARATOR + tableName);
-    dataDir.mkdir();
+    File dataDir1 = new File(tmpLocation + Path.SEPARATOR + tableName1);
+    dataDir1.mkdir();
     FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet", TEST_RES_PATH))),
-        dataDir);
+        dataDir1);
+
+    File dataDir2 = new File(tmpLocation + Path.SEPARATOR + tableName2);
+    dataDir2.mkdir();
+    FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet2", TEST_RES_PATH))),
+        dataDir2);
   }
 
-  @Test
+  @Test // also a negative test case for DRILL-4530
   public void testPartitionPruningWithMetadataCache_1() throws Exception {
-    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName));
-    checkForMetadataFile(tableName);
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName1));
+    checkForMetadataFile(tableName1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
-            " where dir0=1994 and dir1='Q1'",
-        getDfsTestTmpSchemaLocation(), tableName);
-    int expectedRowCount = 10;
-    int expectedNumFiles = 1;
+            " where dir0=1994 and dir1 in ('Q1', 'Q2')",
+        getDfsTestTmpSchemaLocation(), tableName1);
+    int expectedRowCount = 20;
+    int expectedNumFiles = 2;
 
     int actualRowCount = testSql(query);
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
-    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {"Filter"});
+    // since there are 2 or more sub-partitions the single partition cache file optimization does not apply
+    // and cacheFileRoot should point to the top level selectionRoot
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName1);
+    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {"Filter"});
   }
 
-  @Test // DRILL-3917
+  @Test // DRILL-3917, positive test case for DRILL-4530
   public void testPartitionPruningWithMetadataCache_2() throws Exception {
-    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName));
-    checkForMetadataFile(tableName);
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName1));
+    checkForMetadataFile(tableName1);
     String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
             " where dir0=1994",
-        getDfsTestTmpSchemaLocation(), tableName);
+        getDfsTestTmpSchemaLocation(), tableName1);
     int expectedRowCount = 40;
     int expectedNumFiles = 4;
 
@@ -78,7 +88,9 @@ public class TestParquetMetadataCache extends PlanTestBase {
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
-    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {"Filter"});
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1994", getDfsTestTmpSchemaLocation(), tableName1);
+    PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {"Filter"});
   }
 
   @Test // DRILL-3937 (partitioning column is varchar)
@@ -98,8 +110,8 @@ public class TestParquetMetadataCache extends PlanTestBase {
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
-
-    testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {});
+    testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+        new String[] {});
   }
 
   @Test // DRILL-3937 (partitioning column is binary using convert_to)
@@ -205,9 +217,98 @@ public class TestParquetMetadataCache extends PlanTestBase {
         .go();
   }
 
+  @Test // DRILL-4530  // single leaf level partition
+  public void testDrill4530_1() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir0=1995 and dir1='Q3'",
+        getDfsTestTmpSchemaLocation(), tableName2);
+    int expectedRowCount = 20;
+    int expectedNumFiles = 2;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995/Q3", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {"Filter"});
+  }
+
+  @Test // DRILL-4530  // single non-leaf level partition
+  public void testDrill4530_2() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir0=1995",
+        getDfsTestTmpSchemaLocation(), tableName2);
+    int expectedRowCount = 80;
+    int expectedNumFiles = 8;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {"Filter"});
+  }
+
+  @Test // DRILL-4530  // only dir1 filter is present, no dir0, hence this maps to multiple partitions
+  public void testDrill4530_3() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir1='Q3'",
+        getDfsTestTmpSchemaLocation(), tableName2);
+    int expectedRowCount = 40;
+    int expectedNumFiles = 4;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {"Filter"});
+  }
+
+  @Test // DRILL-4530  // non-existent partition (1 subdirectory's cache file will still be read for schema)
+  public void testDrill4530_4() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir0=1995 and dir1='Q6'",
+        getDfsTestTmpSchemaLocation(), tableName2);
+    int expectedRowCount = 0;
+    int expectedNumFiles = 1;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/*/*", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {});
+  }
+
   private void checkForMetadataFile(String table) throws Exception {
     String tmpDir = getDfsTestTmpSchemaLocation();
     String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);
     Assert.assertTrue(Files.exists(new File(metaFile).toPath()));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/1.parquet
new file mode 100644
index 0000000..b4abe60
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/2.parquet
new file mode 100644
index 0000000..b4abe60
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q1/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/1.parquet
new file mode 100644
index 0000000..f5338af
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/2.parquet
new file mode 100644
index 0000000..f5338af
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q2/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/1.parquet
new file mode 100644
index 0000000..16cb2c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/2.parquet
new file mode 100644
index 0000000..16cb2c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q3/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/1.parquet
new file mode 100644
index 0000000..bf0ed05
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/2.parquet
new file mode 100644
index 0000000..bf0ed05
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1994/Q4/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/1.parquet
new file mode 100644
index 0000000..93514c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/2.parquet
new file mode 100644
index 0000000..93514c4
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q1/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/1.parquet
new file mode 100644
index 0000000..e8ae33e
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/2.parquet
new file mode 100644
index 0000000..e8ae33e
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q2/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/1.parquet
new file mode 100644
index 0000000..aae46dd
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/2.parquet
new file mode 100644
index 0000000..aae46dd
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q3/2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/1.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/1.parquet
new file mode 100644
index 0000000..bae64e3
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/4f818d07/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/2.parquet b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/2.parquet
new file mode 100644
index 0000000..bae64e3
Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquet2/1995/Q4/2.parquet differ


Mime
View raw message