Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B9E9200B2D for ; Thu, 16 Jun 2016 20:33:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A524160A51; Thu, 16 Jun 2016 18:33:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5851E1602C5 for ; Thu, 16 Jun 2016 20:33:39 +0200 (CEST) Received: (qmail 49811 invoked by uid 500); 16 Jun 2016 18:33:38 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 49789 invoked by uid 99); 16 Jun 2016 18:33:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 18:33:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F720E00DB; Thu, 16 Jun 2016 18:33:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pxiong@apache.org To: commits@hive.apache.org Message-Id: <534c7d7040254244b17b1c843487ae8e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-13984: Use multi-threaded approach to listing files for msck (Pengcheng Xiong, reviewed by Prasanth Jayachandran, Hari Sankar Sivarama Subramaniyan, Rajesh Balamohan) Date: Thu, 16 Jun 2016 18:33:38 +0000 (UTC) archived-at: Thu, 16 Jun 2016 18:33:40 -0000 Repository: hive Updated Branches: refs/heads/branch-2.1 5633df3c2 -> dcb8e3956 HIVE-13984: Use multi-threaded approach to listing files for msck (Pengcheng Xiong, reviewed by Prasanth Jayachandran, Hari Sankar Sivarama Subramaniyan, Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcb8e395 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcb8e395 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcb8e395 Branch: refs/heads/branch-2.1 Commit: dcb8e395666a8792bfb62dd36a642443a6476ef3 Parents: 5633df3 Author: Pengcheng Xiong Authored: Thu Jun 16 11:30:20 2016 -0700 Committer: Pengcheng Xiong Committed: Thu Jun 16 11:33:25 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../hive/ql/metadata/HiveMetaStoreChecker.java | 106 ++++++++++++++++--- 2 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 33b0713..c60a193 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2401,8 +2401,9 @@ public class HiveConf extends Configuration { HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), - HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" - + " used to move files in move task. Set it to 0 to disable multi-threaded file moves."), + HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" + + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" + + " MSCK to check tables."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( http://git-wip-us.apache.org/repos/asf/hive/blob/dcb8e395/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 10fa561..1122f8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -21,9 +21,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -39,6 +48,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult; import org.apache.thrift.TException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Verify that the information in the metastore matches what is on the * filesystem. Return a CheckResult object containing lists of missing and any @@ -286,9 +297,10 @@ public class HiveMetaStoreChecker { * Result object * @throws IOException * Thrown if we fail at fetching listings from the fs. + * @throws HiveException */ void findUnknownPartitions(Table table, Set partPaths, - CheckResult result) throws IOException { + CheckResult result) throws IOException, HiveException { Path tablePath = table.getPath(); // now check the table folder and see if we find anything @@ -353,28 +365,88 @@ public class HiveMetaStoreChecker { * This set will contain the leaf paths at the end. * @throws IOException * Thrown if we can't get lists from the fs. + * @throws HiveException */ - private void getAllLeafDirs(Path basePath, Set allDirs) - throws IOException { - getAllLeafDirs(basePath, allDirs, basePath.getFileSystem(conf)); + private void getAllLeafDirs(Path basePath, Set allDirs) throws IOException, HiveException { + ConcurrentLinkedQueue basePaths = new ConcurrentLinkedQueue<>(); + basePaths.add(basePath); + // we only use the keySet of ConcurrentHashMap + Map dirSet = new ConcurrentHashMap<>(); + // Here we just reuse the THREAD_COUNT configuration for + // HIVE_MOVE_FILES_THREAD_COUNT + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors + .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) + : null; + if (pool == null) { + LOG.debug("Not-using threaded version of MSCK-GetPaths"); + } else { + LOG.debug("Using threaded version of MSCK-GetPaths with number of threads " + + ((ThreadPoolExecutor) pool).getPoolSize()); + } + getAllLeafDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf)); + pool.shutdown(); + allDirs.addAll(dirSet.keySet()); } - private void getAllLeafDirs(Path basePath, Set allDirs, FileSystem fs) - throws IOException { - - FileStatus[] statuses = fs.listStatus(basePath, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean directoryFound=false; + // process the basePaths in parallel and then the next level of basePaths + private void getAllLeafDirs(final ExecutorService pool, final ConcurrentLinkedQueue basePaths, + final Map allDirs, final FileSystem fs) throws IOException, HiveException { + final ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); + if (null == pool) { + for (final Path path : basePaths) { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean directoryFound = false; + for (FileStatus status : statuses) { + if (status.isDir()) { + directoryFound = true; + nextLevel.add(status.getPath()); + } + } - for (FileStatus status : statuses) { - if (status.isDir()) { - directoryFound = true; - getAllLeafDirs(status.getPath(), allDirs, fs); + if (!directoryFound) { + allDirs.put(path, null); + } + if (!nextLevel.isEmpty()) { + getAllLeafDirs(pool, nextLevel, allDirs, fs); + } + } + } else { + final List> futures = new LinkedList<>(); + for (final Path path : basePaths) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean directoryFound = false; + + for (FileStatus status : statuses) { + if (status.isDir()) { + directoryFound = true; + nextLevel.add(status.getPath()); + } + } + + if (!directoryFound) { + allDirs.put(path, null); + } + return null; + } + })); + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.error(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + if (!nextLevel.isEmpty()) { + getAllLeafDirs(pool, nextLevel, allDirs, fs); } - } - - if(!directoryFound){ - allDirs.add(basePath); } }