hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sp...@apache.org
Subject hive git commit: HIVE-15879: Fix HiveMetaStoreChecker.checkPartitionDirs method (Vihang Karajgaonkar, reviewed by Rajesh Balamohan, Sergio Pena)
Date Tue, 28 Feb 2017 17:54:28 GMT
Repository: hive
Updated Branches:
  refs/heads/master 84fdc1c7c -> 95da916eb


HIVE-15879: Fix HiveMetaStoreChecker.checkPartitionDirs method (Vihang Karajgaonkar, reviewed
by Rajesh Balamohan, Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95da916e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95da916e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95da916e

Branch: refs/heads/master
Commit: 95da916eb249cc0351bf4b94ec6c00b77db03cf3
Parents: 84fdc1c
Author: Vihang Karajgaonkar <vihang@cloudera.com>
Authored: Tue Feb 28 11:53:34 2017 -0600
Committer: Sergio Pena <sergio.pena@cloudera.com>
Committed: Tue Feb 28 11:53:34 2017 -0600

----------------------------------------------------------------------
 .../hive/ql/metadata/HiveMetaStoreChecker.java  | 265 +++++++++++--------
 .../ql/metadata/TestHiveMetaStoreChecker.java   | 260 +++++++++++++++++-
 2 files changed, 408 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/95da916e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 7c94c95..5d2ae2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -19,19 +19,20 @@ package org.apache.hadoop.hive.ql.metadata;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.slf4j.Logger;
@@ -403,16 +404,11 @@ public class HiveMetaStoreChecker {
    */
 
   private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws
IOException, HiveException {
-    ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
-    basePaths.add(basePath);
-    Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
-    // Here we just reuse the THREAD_COUNT configuration for
-    // HIVE_MOVE_FILES_THREAD_COUNT
     int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15);
 
     // Check if too low config is provided for move files. 2x CPU is reasonable max count.
     poolSize = poolSize == 0 ? poolSize : Math.max(poolSize,
-        Runtime.getRuntime().availableProcessors() * 2);
+        getMinPoolSize());
 
     // Fixed thread pool on need basis
     final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor)
@@ -421,135 +417,176 @@ public class HiveMetaStoreChecker {
 
     if (pool == null) {
       LOG.debug("Not-using threaded version of MSCK-GetPaths");
+      Queue<Path> basePaths = new LinkedList<>();
+      basePaths.add(basePath);
+      checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf),
maxDepth,
+          maxDepth);
     } else {
-      LOG.debug("Using threaded version of MSCK-GetPaths with number of threads "
+      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads "
           + pool.getMaximumPoolSize());
+      checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs,
+          basePath.getFileSystem(conf), maxDepth);
     }
-    checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
     if (pool != null) {
       pool.shutdown();
     }
-    allDirs.addAll(dirSet);
   }
 
-  // process the basePaths in parallel and then the next level of basePaths
-  private void checkPartitionDirs(final ThreadPoolExecutor pool,
-      final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
-      final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException
{
-    final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
+  @VisibleForTesting
+  int getMinPoolSize() {
+    return Runtime.getRuntime().availableProcessors() * 2;
+  }
 
-    // Check if thread pool can be used.
-    boolean useThreadPool = false;
-    if (pool != null) {
-      synchronized (pool) {
-        // In case of recursive calls, it is possible to deadlock with TP. Check TP usage
here.
-        if (pool.getActiveCount() < pool.getMaximumPoolSize()) {
-          useThreadPool = true;
-        }
+  private final class PathDepthInfoCallable implements Callable<Path> {
+    private final int maxDepth;
+    private final FileSystem fs;
+    private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
+    private final boolean throwException;
+    private final PathDepthInfo pd;
+
+    private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs,
+        ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
+      this.maxDepth = maxDepth;
+      this.pd = pd;
+      this.fs = fs;
+      this.pendingPaths = basePaths;
+      this.throwException = "throw"
+      .equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION));
+    }
 
-        if (!useThreadPool) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Not using threadPool as active count:" + pool.getActiveCount()
-                + ", max:" + pool.getMaximumPoolSize());
-          }
-        }
-      }
+    @Override
+    public Path call() throws Exception {
+      return processPathDepthInfo(pd);
     }
 
-    if (null == pool || !useThreadPool) {
-      for (final Path path : basePaths) {
-        FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
-        boolean fileFound = false;
-        for (FileStatus status : statuses) {
-          if (status.isDirectory()) {
-            nextLevel.add(status.getPath());
-          } else {
-            fileFound = true;
-          }
+    private Path processPathDepthInfo(final PathDepthInfo pd)
+        throws IOException, HiveException, InterruptedException {
+      final Path currentPath = pd.p;
+      final int currentDepth = pd.depth;
+      FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      // found no files under a sub-directory under table base path; it is possible that
the table
+      // is empty and hence there are no partition sub-directories created under base path
+      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth
< maxDepth) {
+        // since maxDepth is not yet reached, we are missing partition
+        // columns in currentPath
+        if (throwException) {
+          throw new HiveException(
+              "MSCK is missing partition columns under " + currentPath.toString());
+        } else {
+          LOG.warn("MSCK is missing partition columns under " + currentPath.toString());
         }
-        if (depth != 0) {
-          // we are in the middle of the search and we find a file
-          if (fileFound) {
-            if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
+      } else {
+        // found files under currentPath add them to the queue if it is a directory
+        for (FileStatus fileStatus : fileStatuses) {
+          if (!fileStatus.isDirectory() && currentDepth < maxDepth) {
+            // found a file at depth which is less than number of partition keys
+            if (throwException) {
               throw new HiveException(
-                  "MSCK finds a file rather than a folder when it searches for " + path.toString());
-            } else {
-              LOG.warn("MSCK finds a file rather than a folder when it searches for "
-                  + path.toString());
-            }
-          }
-          if (!nextLevel.isEmpty()) {
-            checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
-          } else if (depth != maxDepth) {
-            // since nextLevel is empty, we are missing partition columns.
-            if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
-              throw new HiveException("MSCK is missing partition columns under " + path.toString());
+                  "MSCK finds a file rather than a directory when it searches for "
+                      + fileStatus.getPath().toString());
             } else {
-              LOG.warn("MSCK is missing partition columns under " + path.toString());
+              LOG.warn("MSCK finds a file rather than a directory when it searches for "
+                  + fileStatus.getPath().toString());
             }
+          } else if (fileStatus.isDirectory() && currentDepth < maxDepth) {
+            // add sub-directory to the work queue if maxDepth is not yet reached
+            pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1));
           }
-        } else {
-          allDirs.add(path);
+        }
+        if (currentDepth == maxDepth) {
+          return currentPath;
         }
       }
-    } else {
-      final List<Future<Void>> futures = new LinkedList<>();
-      for (final Path path : basePaths) {
-        futures.add(pool.submit(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
-            boolean fileFound = false;
-            for (FileStatus status : statuses) {
-              if (status.isDirectory()) {
-                nextLevel.add(status.getPath());
-              } else {
-                fileFound = true;
-              }
-            }
-            if (depth != 0) {
-              // we are in the middle of the search and we find a file
-              if (fileFound) {
-                if ("throw".equals(HiveConf.getVar(conf,
-                    HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
-                  throw new HiveException(
-                      "MSCK finds a file rather than a folder when it searches for "
-                          + path.toString());
-                } else {
-                  LOG.warn("MSCK finds a file rather than a folder when it searches for "
-                      + path.toString());
-                }
-              }
-              if (!nextLevel.isEmpty()) {
-                checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
-              } else if (depth != maxDepth) {
-                // since nextLevel is empty, we are missing partition columns.
-                if ("throw".equals(HiveConf.getVar(conf,
-                    HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) {
-                  throw new HiveException("MSCK is missing partition columns under "
-                      + path.toString());
-                } else {
-                  LOG.warn("MSCK is missing partition columns under " + path.toString());
-                }
-              }
-            } else {
-              allDirs.add(path);
-            }
-            return null;
+      return null;
+    }
+  }
+
+  private static class PathDepthInfo {
+    private final Path p;
+    private final int depth;
+    PathDepthInfo(Path p, int depth) {
+      this.p = p;
+      this.depth = depth;
+    }
+  }
+
+  private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool,
+      final Path basePath, final Set<Path> result,
+      final FileSystem fs, final int maxDepth) throws HiveException {
+    try {
+      Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
+      ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
+      nextLevel.add(new PathDepthInfo(basePath, 0));
+      //Uses level parallel implementation of a bfs. Recursive DFS implementations
+      //have a issue where the number of threads can run out if the number of
+      //nested sub-directories is more than the pool size.
+      //Using a two queue implementation is simpler than one queue since then we will
+      //have to add the complex mechanisms to let the free worker threads know when new levels
are
+      //discovered using notify()/wait() mechanisms which can potentially lead to bugs if
+      //not done right
+      while(!nextLevel.isEmpty()) {
+        ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
+        //process each level in parallel
+        while(!nextLevel.isEmpty()) {
+          futures.add(
+              pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
+        }
+        while(!futures.isEmpty()) {
+          Path p = futures.poll().get();
+          if (p != null) {
+            result.add(p);
           }
-        }));
+        }
+        //update the nextlevel with newly discovered sub-directories from the above
+        nextLevel = tempQueue;
       }
-      for (Future<Void> future : futures) {
-        try {
-          future.get();
-        } catch (Exception e) {
-          LOG.error(e.getMessage());
-          pool.shutdownNow();
-          throw new HiveException(e.getCause());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e.getMessage());
+      pool.shutdownNow();
+      throw new HiveException(e.getCause());
+    }
+  }
+
+  /*
+   * Original recursive implementation works well for single threaded use-case but has limitations
+   * if we attempt to parallelize this directly
+   */
+  private void checkPartitionDirsSingleThreaded(Queue<Path> basePaths, final Set<Path>
allDirs,
+      final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException
{
+    final Queue<Path> nextLevel = new LinkedList<>();
+    for (final Path path : basePaths) {
+      FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      boolean fileFound = false;
+      for (FileStatus status : statuses) {
+        if (status.isDirectory()) {
+          nextLevel.add(status.getPath());
+        } else {
+          fileFound = true;
         }
       }
-      if (!nextLevel.isEmpty() && depth != 0) {
-        checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth);
+      if (depth != 0) {
+        // we are in the middle of the search and we find a file
+        if (fileFound) {
+          if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
+            throw new HiveException(
+                "MSCK finds a file rather than a folder when it searches for " + path.toString());
+          } else {
+            LOG.warn("MSCK finds a file rather than a folder when it searches for "
+                + path.toString());
+          }
+        }
+        if (!nextLevel.isEmpty()) {
+          checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth);
+        } else if (depth != maxDepth) {
+          // since nextLevel is empty, we are missing partition columns.
+          if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
+            throw new HiveException("MSCK is missing partition columns under " + path.toString());
+          } else {
+            LOG.warn("MSCK is missing partition columns under " + path.toString());
+          }
+        }
+      } else {
+        allDirs.add(path);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/95da916e/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
index 35f52cd..f95c130 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.metadata;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,6 +30,7 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -37,8 +39,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.Shell;
 import org.apache.thrift.TException;
+import org.mockito.Mockito;
 
 /**
  * TestHiveMetaStoreChecker.
@@ -63,6 +65,8 @@ public class TestHiveMetaStoreChecker extends TestCase {
   protected void setUp() throws Exception {
     super.setUp();
     hive = Hive.get();
+    hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 15);
+    hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "throw");
     checker = new HiveMetaStoreChecker(hive);
 
     partCols = new ArrayList<FieldSchema>();
@@ -104,7 +108,6 @@ public class TestHiveMetaStoreChecker extends TestCase {
 
   public void testTableCheck() throws HiveException, MetaException,
       IOException, TException, AlreadyExistsException {
-
     CheckResult result = new CheckResult();
     checker.checkMetastore(dbName, null, null, result);
     // we haven't added anything so should return an all ok
@@ -194,7 +197,6 @@ public class TestHiveMetaStoreChecker extends TestCase {
 
   public void testPartitionsCheck() throws HiveException, MetaException,
       IOException, TException, AlreadyExistsException {
-
     Database db = new Database();
     db.setName(dbName);
     hive.createDatabase(db);
@@ -311,4 +313,256 @@ public class TestHiveMetaStoreChecker extends TestCase {
     hive.dropDatabase(dbName);
     assertFalse(fs.exists(fakeTable));
   }
+
+  /*
+   * Test multi-threaded implementation of checker to find out missing partitions
+   */
+  public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException
{
+    Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    CheckResult result = new CheckResult();
+    checker.checkMetastore(dbName, tableName, null, result);
+    assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
+    assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
+    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(10, result.getPartitionsNotInMs().size());
+  }
+
+  /*
+   * Tests single threaded implementation of checkMetastore
+   */
+  public void testSingleThreadedCheckMetastore()
+      throws HiveException, AlreadyExistsException, IOException {
+    // set num of threads to 0 so that single-threaded checkMetastore is called
+    hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+    Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    CheckResult result = new CheckResult();
+    checker.checkMetastore(dbName, tableName, null, result);
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(10, result.getPartitionsNotInMs().size());
+  }
+
+  /**
+   * Tests single threaded implementation for deeply nested partitioned tables
+   *
+   * @throws HiveException
+   * @throws AlreadyExistsException
+   * @throws IOException
+   */
+  public void testSingleThreadedDeeplyNestedTables()
+      throws HiveException, AlreadyExistsException, IOException {
+    // set num of threads to 0 so that single-threaded checkMetastore is called
+    hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+    // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
+    // no other easy way to set it deterministically for this test case
+    checker = Mockito.spy(checker);
+    Mockito.when(checker.getMinPoolSize()).thenReturn(2);
+    int poolSize = checker.getMinPoolSize();
+    // create a deeply nested table which has more partition keys than the pool size
+    Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    CheckResult result = new CheckResult();
+    checker.checkMetastore(dbName, tableName, null, result);
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(10, result.getPartitionsNotInMs().size());
+  }
+
+  /**
+   * Tests the case when the number of partition keys are more than the threadpool size.
+   *
+   * @throws HiveException
+   * @throws AlreadyExistsException
+   * @throws IOException
+   */
+  public void testDeeplyNestedPartitionedTables()
+      throws HiveException, AlreadyExistsException, IOException {
+    // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
+    // no other easy way to set it deterministically for this test case
+    int poolSize = checker.getMinPoolSize();
+    checker = Mockito.spy(checker);
+    Mockito.when(checker.getMinPoolSize()).thenReturn(2);
+    // create a deeply nested table which has more partition keys than the pool size
+    Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    CheckResult result = new CheckResult();
+    checker.checkMetastore(dbName, tableName, null, result);
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
+    assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
+    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(10, result.getPartitionsNotInMs().size());
+  }
+
+  /**
+   * Test if checker throws HiveException when the there is a dummy directory present in
the nested level
+   * of sub-directories
+   * @throws AlreadyExistsException
+   * @throws IOException
+   * @throws HiveException
+   */
+  public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, IOException,
HiveException {
+    Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    //create a fake directory to throw exception
+    StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString());
+    sb.append(Path.SEPARATOR);
+    sb.append("dummyPart=error");
+    createDirectory(sb.toString());
+    //check result now
+    CheckResult result = new CheckResult();
+    try {
+      checker.checkMetastore(dbName, tableName, null, result);
+    } catch (Exception e) {
+      assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+    }
+    createFile(sb.toString(), "dummyFile");
+    result = new CheckResult();
+    try {
+      checker.checkMetastore(dbName, tableName, null, result);
+    } catch (Exception e) {
+      assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+    }
+  }
+
+  /*
+   * Test if single-threaded implementation checker throws HiveException when the there is
a dummy
+   * directory present in the nested level
+   */
+  public void testErrorForMissingPartitionsSingleThreaded()
+      throws AlreadyExistsException, HiveException, IOException {
+    // set num of threads to 0 so that single-threaded checkMetastore is called
+    hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0);
+    Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
+    // add 10 partitions on the filesystem
+    createPartitionsDirectoriesOnFS(testTable, 10);
+    // create a fake directory to throw exception
+    StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString());
+    sb.append(Path.SEPARATOR);
+    sb.append("dummyPart=error");
+    createDirectory(sb.toString());
+    // check result now
+    CheckResult result = new CheckResult();
+    try {
+      checker.checkMetastore(dbName, tableName, null, result);
+    } catch (Exception e) {
+      assertTrue("Expected exception HiveException got " + e.getClass(),
+          e instanceof HiveException);
+    }
+    createFile(sb.toString(), "dummyFile");
+    result = new CheckResult();
+    try {
+      checker.checkMetastore(dbName, tableName, null, result);
+    } catch (Exception e) {
+      assertTrue("Expected exception HiveException got " + e.getClass(),
+          e instanceof HiveException);
+    }
+  }
+  /**
+   * Creates a test partitioned table with the required level of nested partitions and number
of
+   * partitions
+   *
+   * @param dbName - Database name
+   * @param tableName - Table name
+   * @param numOfPartKeys - Number of partition keys (nested levels of sub-directories in
base table
+   *          path)
+   * @param valuesPerPartition - If greater than 0 creates valuesPerPartition dummy partitions
+   * @return
+   * @throws AlreadyExistsException
+   * @throws HiveException
+   */
+  private Table createPartitionedTestTable(String dbName, String tableName, int numOfPartKeys,
+      int valuesPerPartition) throws AlreadyExistsException, HiveException {
+    Database db = new Database();
+    db.setName(dbName);
+    hive.createDatabase(db);
+
+    Table table = new Table(dbName, tableName);
+    table.setDbName(dbName);
+    table.setInputFormatClass(TextInputFormat.class);
+    table.setOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
+    // create partition key schema
+    ArrayList<FieldSchema> partKeys = new ArrayList<FieldSchema>();
+    for (int i = 1; i <= numOfPartKeys; i++) {
+      String partName = "part" + String.valueOf(i);
+      partKeys.add(new FieldSchema(partName, serdeConstants.STRING_TYPE_NAME, ""));
+    }
+    table.setPartCols(partKeys);
+    // create table
+    hive.createTable(table);
+    table = hive.getTable(dbName, tableName);
+    if (valuesPerPartition == 0) {
+      return table;
+    }
+    // create partition specs
+    ArrayList<Map<String, String>> partitionSpecs = new ArrayList<Map<String,
String>>();
+    for (int partKeyIndex = 0; partKeyIndex < numOfPartKeys; partKeyIndex++) {
+      String partName = partKeys.get(partKeyIndex).getName();
+      Map<String, String> partMap = new HashMap<>();
+      for (int val = 1; val <= valuesPerPartition; val++) {
+        partMap.put(partName, String.valueOf(val));
+      }
+      partitionSpecs.add(partMap);
+    }
+
+    // create partitions
+    for (Map<String, String> partSpec : partitionSpecs) {
+      hive.createPartition(table, partSpec);
+    }
+
+    List<Partition> partitions = hive.getPartitions(table);
+    assertEquals(numOfPartKeys * valuesPerPartition, partitions.size());
+    return table;
+  }
+
+  /**
+   * Creates partition sub-directories for a given table on the file system. Used to test
the
+   * use-cases when partitions for the table are not present in the metastore db
+   *
+   * @param table - Table which provides the base locations and partition specs for creating
the
+   *          sub-directories
+   * @param numPartitions - Number of partitions to be created
+   * @throws IOException
+   */
+  private void createPartitionsDirectoriesOnFS(Table table, int numPartitions) throws IOException
{
+    String path = table.getDataLocation().toString();
+    fs = table.getPath().getFileSystem(hive.getConf());
+    int numPartKeys = table.getPartitionKeys().size();
+    for (int i = 0; i < numPartitions; i++) {
+      StringBuilder partPath = new StringBuilder(path);
+      partPath.append(Path.SEPARATOR);
+      for (int j = 0; j < numPartKeys; j++) {
+        FieldSchema field = table.getPartitionKeys().get(j);
+        partPath.append(field.getName());
+        partPath.append('=');
+        partPath.append("val_");
+        partPath.append(i);
+        if (j < (numPartKeys - 1)) {
+          partPath.append(Path.SEPARATOR);
+        }
+      }
+      createDirectory(partPath.toString());
+    }
+  }
+
+  private void createFile(String partPath, String filename) throws IOException {
+    Path part = new Path(partPath);
+    fs.mkdirs(part);
+    fs.createNewFile(new Path(partPath + Path.SEPARATOR + filename));
+    fs.deleteOnExit(part);
+  }
+
+  private void createDirectory(String partPath) throws IOException {
+    Path part = new Path(partPath);
+    fs.mkdirs(part);
+    fs.deleteOnExit(part);
+  }
 }


Mime
View raw message