hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pxi...@apache.org
Subject hive git commit: HIVE-13984: Use multi-threaded approach to listing files for msck (Pengcheng Xiong, reviewed by Prasanth Jayachandran, Hari Sankar Sivarama Subramaniyan, Rajesh Balamohan)
Date Thu, 16 Jun 2016 18:33:38 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.1 5633df3c2 -> dcb8e3956


HIVE-13984: Use multi-threaded approach to listing files for msck (Pengcheng Xiong, reviewed
by Prasanth Jayachandran, Hari Sankar Sivarama Subramaniyan, Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcb8e395
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcb8e395
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcb8e395

Branch: refs/heads/branch-2.1
Commit: dcb8e395666a8792bfb62dd36a642443a6476ef3
Parents: 5633df3
Author: Pengcheng Xiong <pxiong@apache.org>
Authored: Thu Jun 16 11:30:20 2016 -0700
Committer: Pengcheng Xiong <pxiong@apache.org>
Committed: Thu Jun 16 11:33:25 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../hive/ql/metadata/HiveMetaStoreChecker.java  | 106 ++++++++++++++++---
 2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 33b0713..c60a193 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2401,8 +2401,9 @@ public class HiveConf extends Configuration {
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to execute"),
 
-     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new  SizeValidator(0L, true,
1024L, true), "Number of threads"
-         + " used to move files in move task. Set it to 0 to disable multi-threaded file
moves."),
+    HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new  SizeValidator(0L, true,
1024L, true), "Number of threads"
+         + " used to move files in move task. Set it to 0 to disable multi-threaded file
moves. This parameter is also used by"
+         + " MSCK to check tables."),
     // If this is set all move tasks at the end of a multi-insert query will only begin once
all
     // outputs are ready
     HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES(

http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 10fa561..1122f8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -21,9 +21,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -39,6 +48,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
 import org.apache.thrift.TException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Verify that the information in the metastore matches what is on the
  * filesystem. Return a CheckResult object containing lists of missing and any
@@ -286,9 +297,10 @@ public class HiveMetaStoreChecker {
    *          Result object
    * @throws IOException
    *           Thrown if we fail at fetching listings from the fs.
+   * @throws HiveException 
    */
   void findUnknownPartitions(Table table, Set<Path> partPaths,
-      CheckResult result) throws IOException {
+      CheckResult result) throws IOException, HiveException {
 
     Path tablePath = table.getPath();
     // now check the table folder and see if we find anything
@@ -353,28 +365,88 @@ public class HiveMetaStoreChecker {
    *          This set will contain the leaf paths at the end.
    * @throws IOException
    *           Thrown if we can't get lists from the fs.
+   * @throws HiveException 
    */
 
-  private void getAllLeafDirs(Path basePath, Set<Path> allDirs)
-      throws IOException {
-    getAllLeafDirs(basePath, allDirs, basePath.getFileSystem(conf));
+  private void getAllLeafDirs(Path basePath, Set<Path> allDirs) throws IOException,
HiveException {
+    ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
+    basePaths.add(basePath);
+    // we only use the keySet of ConcurrentHashMap
+    Map<Path, Object> dirSet = new ConcurrentHashMap<>();
+    // Here we just reuse the THREAD_COUNT configuration for
+    // HIVE_MOVE_FILES_THREAD_COUNT
+    final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
25) > 0 ? Executors
+        .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build())
+            : null;
+    if (pool == null) {
+      LOG.debug("Not-using threaded version of MSCK-GetPaths");
+    } else {
+      LOG.debug("Using threaded version of MSCK-GetPaths with number of threads "
+          + ((ThreadPoolExecutor) pool).getPoolSize());
+    }
+    getAllLeafDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf));
+    pool.shutdown();
+    allDirs.addAll(dirSet.keySet());
   }
 
-  private void getAllLeafDirs(Path basePath, Set<Path> allDirs, FileSystem fs)
-      throws IOException {
-
-    FileStatus[] statuses = fs.listStatus(basePath, FileUtils.HIDDEN_FILES_PATH_FILTER);
-    boolean directoryFound=false;
+  // process the basePaths in parallel and then the next level of basePaths
+  private void getAllLeafDirs(final ExecutorService pool, final ConcurrentLinkedQueue<Path>
basePaths,
+      final Map<Path, Object> allDirs, final FileSystem fs) throws IOException, HiveException
{
+    final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
+    if (null == pool) {
+      for (final Path path : basePaths) {
+        FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
+        boolean directoryFound = false;
+        for (FileStatus status : statuses) {
+          if (status.isDir()) {
+            directoryFound = true;
+            nextLevel.add(status.getPath());
+          }
+        }
 
-    for (FileStatus status : statuses) {
-      if (status.isDir()) {
-        directoryFound = true;
-        getAllLeafDirs(status.getPath(), allDirs, fs);
+        if (!directoryFound) {
+          allDirs.put(path, null);
+        }
+        if (!nextLevel.isEmpty()) {
+          getAllLeafDirs(pool, nextLevel, allDirs, fs);
+        }
+      }
+    } else {
+      final List<Future<Void>> futures = new LinkedList<>();
+      for (final Path path : basePaths) {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
+            boolean directoryFound = false;
+
+            for (FileStatus status : statuses) {
+              if (status.isDir()) {
+                directoryFound = true;
+                nextLevel.add(status.getPath());
+              }
+            }
+
+            if (!directoryFound) {
+              allDirs.put(path, null);
+            }
+            return null;
+          }
+        }));
+      }
+      for (Future<Void> future : futures) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          LOG.error(e.getMessage());
+          pool.shutdownNow();
+          throw new HiveException(e.getCause());
+        }
+      }
+      if (!nextLevel.isEmpty()) {
+        getAllLeafDirs(pool, nextLevel, allDirs, fs);
       }
-    }
-
-    if(!directoryFound){
-      allDirs.add(basePath);
     }
   }
 


Mime
View raw message