drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adene...@apache.org
Subject [1/2] drill git commit: DRILL-4376: Wrong results when doing a count(*) on part of directories with metadata cache
Date Wed, 16 Mar 2016 17:10:10 GMT
Repository: drill
Updated Branches:
  refs/heads/master 050ff9679 -> 71608ca9f


DRILL-4376: Wrong results when doing a count(*) on part of directories with metadata cache


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/11fe8d7c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/11fe8d7c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/11fe8d7c

Branch: refs/heads/master
Commit: 11fe8d7cdb1df4100cd48bcce1de0b2c3c5f983a
Parents: 050ff96
Author: adeneche <adeneche@gmail.com>
Authored: Wed Mar 9 13:44:02 2016 +0100
Committer: adeneche <adeneche@gmail.com>
Committed: Wed Mar 16 16:53:23 2016 +0100

----------------------------------------------------------------------
 .../drill/exec/store/dfs/FileSelection.java     |  22 ++--
 .../exec/store/parquet/ParquetGroupScan.java    | 101 +++++++++++++------
 .../store/parquet/TestParquetMetadataCache.java |  19 ++++
 3 files changed, 103 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/11fe8d7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index b5b1d8f..dc49281 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -249,7 +249,7 @@ public class FileSelection {
    * @see FileSelection#FileSelection(List, List, String)
    */
   public static FileSelection create(final List<FileStatus> statuses, final List<String>
files, final String root) {
-    final boolean bothNonEmptySelection = (statuses != null && statuses.size() >
0) && (files != null && files.size() == 0);
+    final boolean bothNonEmptySelection = (statuses != null && statuses.size() >
0) && (files != null && files.size() > 0);
     final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) &&
(files == null || files.size() == 0);
 
     if (bothNonEmptySelection || bothEmptySelection) {
@@ -263,14 +263,7 @@ public class FileSelection {
       if (Strings.isNullOrEmpty(root)) {
         throw new DrillRuntimeException("Selection root is null or empty" + root);
       }
-      // Handle wild card
-      final Path rootPath;
-      if (root.contains(WILD_CARD)) {
-        final String newRoot = root.substring(0, root.indexOf(WILD_CARD));
-        rootPath = new Path(newRoot);
-      } else {
-        rootPath = new Path(root);
-      }
+      final Path rootPath = handleWildCard(root);
       final URI uri = statuses.get(0).getPath().toUri();
       final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
       selectionRoot = path.toString();
@@ -278,6 +271,17 @@ public class FileSelection {
     return new FileSelection(statuses, files, selectionRoot);
   }
 
+  private static Path handleWildCard(final String root) {
+    if (root.contains(WILD_CARD)) {
+      int idx = root.indexOf(WILD_CARD); // first wild card in the path
+      idx = root.lastIndexOf(PATH_SEPARATOR, idx); // file separator right before the first
wild card
+      final String newRoot = root.substring(0, idx);
+      return new Path(newRoot);
+    } else {
+      return new Path(root);
+    }
+  }
+
   private static String removeLeadingSlash(String path) {
     if (path.charAt(0) == '/') {
       String newPath = path.substring(1);

http://git-wip-us.apache.org/repos/asf/drill/blob/11fe8d7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index ccfca41..774f515 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -177,17 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     this.selectionRoot = selectionRoot;
 
-    FileSelection newSelection = null;
-    if (!selection.isExpanded()) {
-      // if metadata cache exists, do the expansion of selection using the metadata cache;
-      // otherwise let init() handle the expansion
-      FileStatus firstPath = selection.getFirstPath(fs);
-      Path p = new Path(firstPath.getPath(), Metadata.METADATA_FILENAME);
-      if (fs.exists(p)) {
-        newSelection = initFromMetadataCache(fs, selection);
-      }
-    }
-    FileSelection fileSelection = newSelection != null ? newSelection : selection;
+    final FileSelection fileSelection = expandIfNecessary(selection);
 
     this.entries = Lists.newArrayList();
     final List<FileStatus> files = fileSelection.getStatuses(fs);
@@ -221,6 +211,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.parquetTableMetadata = that.parquetTableMetadata;
   }
 
+  /**
+   * expands the selection's folders if metadata cache is found for the selection root.<br>
+   * If the selection has already been expanded or no metadata cache was found, does nothing
+   *
+   * @param selection actual selection before expansion
+   * @return new selection after expansion, if no expansion was done returns the input selection
+   *
+   * @throws IOException
+   */
+  private FileSelection expandIfNecessary(FileSelection selection) throws IOException {
+    if (selection.isExpanded()) {
+      return selection;
+    }
+
+    Path metaFilePath = new Path(selection.getSelectionRoot(), Metadata.METADATA_FILENAME);
+    if (!fs.exists(metaFilePath)) { // no metadata cache
+      return selection;
+    }
+
+    return initFromMetadataCache(selection, metaFilePath);
+  }
 
   public List<ReadEntryWithPath> getEntries() {
     return entries;
@@ -547,29 +558,65 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   }
 
 
-  // Create and return a new file selection based on reading the metadata cache file.
-  // This function also initializes a few of ParquetGroupScan's fields as appropriate.
+  /**
+   * Create and return a new file selection based on reading the metadata cache file.
+   *
+   * This function also initializes a few of ParquetGroupScan's fields as appropriate.
+   *
+   * @param selection initial file selection
+   * @param metaFilePath metadata cache file path
+   * @return file selection read from cache
+   *
+   * @throws IOException
+   */
   private FileSelection
-  initFromMetadataCache(DrillFileSystem fs, FileSelection selection) throws IOException {
-    FileStatus metaRootDir = selection.getFirstPath(fs);
-    Path metaFilePath = new Path(metaRootDir.getPath(), Metadata.METADATA_FILENAME);
+  initFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException {
+    // get the metadata for the root directory by reading the metadata file
+    // parquetTableMetadata contains the metadata for all files in the selection root folder,
but we need to make sure
+    // we only select the files that are part of selection (by setting fileSet appropriately)
 
     // get (and set internal field) the metadata for the directory by reading the metadata
file
     this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
     List<String> fileNames = Lists.newArrayList();
-    for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
-      fileNames.add(file.getPath());
+    List<FileStatus> fileStatuses = selection.getStatuses(fs);
+
+    if (fileStatuses.size() == 1 && fileStatuses.get(0).isDirectory()) {
+      // we are selecting all files from selection root. Expand the file list from the cache
+      for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+        fileNames.add(file.getPath());
+      }
+      // we don't need to populate fileSet as all files are selected
+    } else {
+      // we need to expand the files from fileStatuses
+      for (FileStatus status : fileStatuses) {
+        if (status.isDirectory()) {
+          //TODO read the metadata cache files in parallel
+          final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME);
+          final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString());
+          for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
+            fileNames.add(file.getPath());
+          }
+        } else {
+          final Path path = Path.getPathWithoutSchemeAndAuthority(status.getPath());
+          fileNames.add(path.toString());
+        }
+      }
+
+      // populate fileSet so we only keep the selected row groups
+      fileSet = Sets.newHashSet(fileNames);
     }
+
     // when creating the file selection, set the selection root in the form /a/b instead
of
     // file:/a/b.  The reason is that the file names above have been created in the form
     // /a/b/c.parquet and the format of the selection root must match that of the file names
     // otherwise downstream operations such as partition pruning can break.
-    final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(metaRootDir.getPath());
+    final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot()));
     this.selectionRoot = metaRootPath.toString();
 
     // Use the FileSelection constructor directly here instead of the FileSelection.create()
method
     // because create() changes the root to include the scheme and authority; In future,
if create()
     // is the preferred way to instantiate a file selection, we may need to do something
different...
+    // WARNING: file statuses and file names are inconsistent
     FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames,
metaRootPath.toString());
 
     newSelection.setExpanded();
@@ -577,7 +624,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   }
 
   private void init() throws IOException {
-    List<FileStatus> fileStatuses = null;
     if (entries.size() == 1) {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
       Path metaPath = null;
@@ -599,19 +645,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
       if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
         usedMetadataCache = true;
+        if (parquetTableMetadata == null) {
+          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+        }
         if (fileSet != null) {
-          if (parquetTableMetadata == null) {
-            parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
-          } else {
-            parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
-          }
-        } else {
-          if (parquetTableMetadata == null) {
-            parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
-          }
+          parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
         }
       } else {
-        fileStatuses = Lists.newArrayList();
+        final List<FileStatus> fileStatuses = Lists.newArrayList();
         for (ReadEntryWithPath entry : entries) {
           getFiles(entry.getPath(), fileStatuses);
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/11fe8d7c/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 4330c96..b41502e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -177,6 +177,25 @@ public class TestParquetMetadataCache extends PlanTestBase {
       .go();
   }
 
+  @Test
+  public void testFix4376() throws Exception {
+    // first create some parquet subfolders
+    runSQL("CREATE TABLE dfs_test.tmp.`4376`    AS SELECT * FROM cp.`employee.json` LIMIT
1");
+    runSQL("CREATE TABLE dfs_test.tmp.`4376/01` AS SELECT * FROM cp.`employee.json` LIMIT
2");
+    runSQL("CREATE TABLE dfs_test.tmp.`4376/02` AS SELECT * FROM cp.`employee.json` LIMIT
4");
+    runSQL("CREATE TABLE dfs_test.tmp.`4376/0`  AS SELECT * FROM cp.`employee.json` LIMIT
8");
+    runSQL("CREATE TABLE dfs_test.tmp.`4376/11` AS SELECT * FROM cp.`employee.json` LIMIT
16");
+    runSQL("CREATE TABLE dfs_test.tmp.`4376/12` AS SELECT * FROM cp.`employee.json` LIMIT
32");
+    // next, build the metadata cache file
+    runSQL("REFRESH TABLE METADATA dfs_test.tmp.`4376`");
+
+    testBuilder()
+      .sqlQuery("SELECT COUNT(*) AS `count` FROM dfs_test.tmp.`4376/0*`")
+      .ordered()
+      .baselineColumns("count").baselineValues(15L)
+      .go();
+    }
+
   private void checkForMetadataFile(String table) throws Exception {
     String tmpDir = getDfsTestTmpSchemaLocation();
     String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);


Mime
View raw message