hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgyrtk...@apache.org
Subject hive git commit: HIVE-16119: HiveMetaStoreChecker: remove singleThread logic duplication (Zoltan Haindrich reviewed by Vihang Karajgaonkar, Ashutosh Chauhan)
Date Thu, 09 Mar 2017 08:05:56 GMT
Repository: hive
Updated Branches:
  refs/heads/master 74372256d -> ed2f46aa7


HIVE-16119: HiveMetaStoreChecker: remove singleThread logic duplication (Zoltan Haindrich
reviewed by Vihang Karajgaonkar, Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed2f46aa
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed2f46aa
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed2f46aa

Branch: refs/heads/master
Commit: ed2f46aa737efb859f23d357fbeafe1b42e7d404
Parents: 7437225
Author: Zoltan Haindrich <kirk@rxd.hu>
Authored: Thu Mar 9 08:32:35 2017 +0100
Committer: Zoltan Haindrich <kirk@rxd.hu>
Committed: Thu Mar 9 08:55:43 2017 +0100

----------------------------------------------------------------------
 .../hive/ql/metadata/HiveMetaStoreChecker.java  | 90 ++++----------------
 .../ql/metadata/TestHiveMetaStoreChecker.java   | 39 +++++----
 2 files changed, 35 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ed2f46aa/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 3420ef8..6805c17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -28,11 +28,12 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.slf4j.Logger;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.thrift.TException;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -411,35 +413,19 @@ public class HiveMetaStoreChecker {
     // pool here the smaller sized pool of the two becomes a bottleneck
     int poolSize = conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15);
 
-    // Check if too low config is provided for move files. 2x CPU is reasonable max count.
-    poolSize = poolSize == 0 ? poolSize : Math.max(poolSize,
-        getMinPoolSize());
-
-    // Fixed thread pool on need basis
-    final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor)
-        Executors.newFixedThreadPool(poolSize,
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build())
: null;
-
-    if (pool == null) {
-      LOG.debug("Not-using threaded version of MSCK-GetPaths");
-      Queue<Path> basePaths = new LinkedList<>();
-      basePaths.add(basePath);
-      checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf),
maxDepth,
-          maxDepth);
+    ExecutorService executor;
+    if (poolSize <= 1) {
+      LOG.debug("Using single-threaded version of MSCK-GetPaths");
+      executor = MoreExecutors.sameThreadExecutor();
     } else {
-      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads "
-          + pool.getMaximumPoolSize());
-      checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs,
-          basePath.getFileSystem(conf), maxDepth);
+      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " +
poolSize);
+      ThreadFactory threadFactory =
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
+      executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
     }
-    if (pool != null) {
-      pool.shutdown();
-    }
-  }
+    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth);
 
-  @VisibleForTesting
-  int getMinPoolSize() {
-    return Runtime.getRuntime().availableProcessors() * 2;
+    executor.shutdown();
   }
 
   private final class PathDepthInfoCallable implements Callable<Path> {
@@ -515,7 +501,7 @@ public class HiveMetaStoreChecker {
     }
   }
 
-  private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool,
+  private void checkPartitionDirs(final ExecutorService executor,
       final Path basePath, final Set<Path> result,
       final FileSystem fs, final int maxDepth) throws HiveException {
     try {
@@ -534,7 +520,7 @@ public class HiveMetaStoreChecker {
         //process each level in parallel
         while(!nextLevel.isEmpty()) {
           futures.add(
-              pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
+              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
         }
         while(!futures.isEmpty()) {
           Path p = futures.poll().get();
@@ -547,52 +533,8 @@ public class HiveMetaStoreChecker {
       }
     } catch (InterruptedException | ExecutionException e) {
       LOG.error(e.getMessage());
-      pool.shutdownNow();
+      executor.shutdownNow();
       throw new HiveException(e.getCause());
     }
   }
-
-  /*
-   * Original recursive implementation works well for single threaded use-case but has limitations
-   * if we attempt to parallelize this directly
-   */
-  private void checkPartitionDirsSingleThreaded(Queue<Path> basePaths, final Set<Path>
allDirs,
-      final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException
{
-    for (final Path path : basePaths) {
-      FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
-      final Queue<Path> nextLevel = new LinkedList<>();
-      boolean fileFound = false;
-      for (FileStatus status : statuses) {
-        if (status.isDirectory()) {
-          nextLevel.add(status.getPath());
-        } else {
-          fileFound = true;
-        }
-      }
-      if (depth != 0) {
-        // we are in the middle of the search and we find a file
-        if (fileFound) {
-          if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
-            throw new HiveException(
-                "MSCK finds a file rather than a folder when it searches for " + path.toString());
-          } else {
-            LOG.warn("MSCK finds a file rather than a folder when it searches for "
-                + path.toString());
-          }
-        }
-        if (!nextLevel.isEmpty()) {
-          checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth);
-        } else if (depth != maxDepth) {
-          // since nextLevel is empty, we are missing partition columns.
-          if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)))
{
-            throw new HiveException("MSCK is missing partition columns under " + path.toString());
-          } else {
-            LOG.warn("MSCK is missing partition columns under " + path.toString());
-          }
-        }
-      } else {
-        allDirs.add(path);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed2f46aa/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
index f9bcc52..21bc8ee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.metadata;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,9 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -43,6 +39,10 @@ import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.thrift.TException;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
+import junit.framework.TestCase;
+
 /**
  * TestHiveMetaStoreChecker.
  *
@@ -359,11 +359,7 @@ public class TestHiveMetaStoreChecker extends TestCase {
       throws HiveException, AlreadyExistsException, IOException {
     // set num of threads to 0 so that single-threaded checkMetastore is called
     hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 0);
-    // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
-    // no other easy way to set it deterministically for this test case
-    checker = Mockito.spy(checker);
-    Mockito.when(checker.getMinPoolSize()).thenReturn(2);
-    int poolSize = checker.getMinPoolSize();
+    int poolSize = 2;
     // create a deeply nested table which has more partition keys than the pool size
     Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
     // add 10 partitions on the filesystem
@@ -385,11 +381,8 @@ public class TestHiveMetaStoreChecker extends TestCase {
    */
   public void testDeeplyNestedPartitionedTables()
       throws HiveException, AlreadyExistsException, IOException {
-    // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs
-    // no other easy way to set it deterministically for this test case
-    int poolSize = checker.getMinPoolSize();
-    checker = Mockito.spy(checker);
-    Mockito.when(checker.getMinPoolSize()).thenReturn(2);
+    hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 2);
+    int poolSize = 2;
     // create a deeply nested table which has more partition keys than the pool size
     Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0);
     // add 10 partitions on the filesystem
@@ -420,18 +413,22 @@ public class TestHiveMetaStoreChecker extends TestCase {
     createDirectory(sb.toString());
     //check result now
     CheckResult result = new CheckResult();
+    Exception exception = null;
     try {
       checker.checkMetastore(dbName, tableName, null, result);
     } catch (Exception e) {
-      assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+      exception = e;
     }
+    assertTrue("Expected HiveException", exception!=null && exception instanceof
HiveException);
     createFile(sb.toString(), "dummyFile");
     result = new CheckResult();
+    exception = null;
     try {
       checker.checkMetastore(dbName, tableName, null, result);
     } catch (Exception e) {
-      assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException);
+      exception = e;
     }
+    assertTrue("Expected HiveException", exception!=null && exception instanceof
HiveException);
   }
 
   /*
@@ -452,20 +449,22 @@ public class TestHiveMetaStoreChecker extends TestCase {
     createDirectory(sb.toString());
     // check result now
     CheckResult result = new CheckResult();
+    Exception exception = null;
     try {
       checker.checkMetastore(dbName, tableName, null, result);
     } catch (Exception e) {
-      assertTrue("Expected exception HiveException got " + e.getClass(),
-          e instanceof HiveException);
+      exception = e;
     }
+    assertTrue("Expected HiveException", exception!=null && exception instanceof
HiveException);
     createFile(sb.toString(), "dummyFile");
     result = new CheckResult();
+    exception = null;
     try {
       checker.checkMetastore(dbName, tableName, null, result);
     } catch (Exception e) {
-      assertTrue("Expected exception HiveException got " + e.getClass(),
-          e instanceof HiveException);
+      exception = e;
     }
+    assertTrue("Expected HiveException", exception!=null && exception instanceof
HiveException);
   }
   /**
    * Creates a test partitioned table with the required level of nested partitions and number
of


Mime
View raw message