Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A12D7200AE1 for ; Mon, 6 Jun 2016 17:17:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9FA46160A24; Mon, 6 Jun 2016 15:17:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A37E160A1E for ; Mon, 6 Jun 2016 17:17:24 +0200 (CEST) Received: (qmail 83449 invoked by uid 500); 6 Jun 2016 15:17:23 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 83438 invoked by uid 99); 6 Jun 2016 15:17:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jun 2016 15:17:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F5E2DFC13; Mon, 6 Jun 2016 15:17:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hashutosh@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-13933 : Add an option to turn off parallel file moves (Ashutosh Chauhan via Hari Sankar Subramaniyan) Date: Mon, 6 Jun 2016 15:17:23 +0000 (UTC) archived-at: Mon, 06 Jun 2016 15:17:25 -0000 Repository: hive Updated Branches: refs/heads/branch-2.1 592d0f139 -> 02490ec5a HIVE-13933 : Add an option to turn off parallel file moves (Ashutosh Chauhan via Hari Sankar Subramaniyan) Signed-off-by: Ashutosh Chauhan Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/02490ec5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/02490ec5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/02490ec5 Branch: refs/heads/branch-2.1 Commit: 02490ec5a6d23afd4d2878b2fecf9b840c7a0a60 Parents: 592d0f1 Author: Ashutosh Chauhan Authored: Thu Jun 2 16:07:53 2016 -0700 Committer: Ashutosh Chauhan Committed: Mon Jun 6 08:17:10 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 215 +++++++++++-------- 2 files changed, 131 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/02490ec5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 93af7b9..698ba66 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2396,8 +2396,8 @@ public class HiveConf extends Configuration { HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), - HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(1L, true, 1024L, true), "Number of threads" - + " used to move files in move task"), + HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" + + " used to move files in move task. Set it to 0 to disable multi-threaded file moves."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( http://git-wip-us.apache.org/repos/asf/hive/blob/02490ec5/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3a7d3bb..16d9b03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2647,10 +2647,9 @@ private void constructOneLBLocationMap(FileStatus fSta, final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); final List>> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); - + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; for (FileStatus src : srcs) { FileStatus[] files; if (src.isDirectory()) { @@ -2666,29 +2665,27 @@ private void constructOneLBLocationMap(FileStatus fSta, final SessionState parentSession = SessionState.get(); for (final FileStatus srcFile : files) { + final Path srcP = srcFile.getPath(); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + final String name; + final String filetype; + String itemName = srcP.getName(); + int index = itemName.lastIndexOf('.'); + if (index >= 0) { + filetype = itemName.substring(index); + name = itemName.substring(0, index); + } else { + name = itemName; + filetype = ""; + } - futures.add(pool.submit(new Callable>() { - @Override - public ObjectPair call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - final Path srcP = srcFile.getPath(); - final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - final String name; - final String filetype; - String itemName = srcP.getName(); - int index = itemName.lastIndexOf('.'); - if (index >= 0) { - filetype = itemName.substring(index); - name = itemName.substring(0, index); - } else { - name = itemName; - filetype = ""; - } + final String srcGroup = srcFile.getGroup(); + if (null == pool) { + Path destPath = new Path(destf, srcP.getName()); + try { - Path destPath = new Path(destf, srcP.getName()); - String srcGroup = srcFile.getGroup(); if (!needToCopy && !isSrcLocal) { for (int counter = 1; !destFs.rename(srcP,destPath); counter++) { destPath = new Path(destf, name + ("_copy_" + counter) + filetype); @@ -2696,27 +2693,59 @@ private void constructOneLBLocationMap(FileStatus fSta, } else { destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); } - - if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); - } if (null != newFiles) { newFiles.add(destPath); } - return ObjectPair.create(srcP, destPath); + } catch (IOException ioe) { + LOG.error("Failed to move: {}", ioe.getMessage()); + throw new HiveException(ioe.getCause()); } - })); + } else { + futures.add(pool.submit(new Callable>() { + @Override + public ObjectPair call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + Path destPath = new Path(destf, srcP.getName()); + if (!needToCopy && !isSrcLocal) { + for (int counter = 1; !destFs.rename(srcP,destPath); counter++) { + destPath = new Path(destf, name + ("_copy_" + counter) + filetype); + } + } else { + destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); + } + + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); + } + if (null != newFiles) { + newFiles.add(destPath); + } + return ObjectPair.create(srcP, destPath); + } + })); + } } } - pool.shutdown(); - for (Future> future : futures) { - try { - ObjectPair pair = future.get(); - LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString()); - } catch (Exception e) { - LOG.error("Failed to move: {}", e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + if (null == pool) { + if (inheritPerms) { + try { + HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, destf, true); + } catch (IOException e) { + LOG.error("Failed to move: {}", e.getMessage()); + throw new HiveException(e.getCause()); + } + } + } else { + pool.shutdown(); + for (Future> future : futures) { + try { + ObjectPair pair = future.get(); + LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString()); + } catch (Exception e) { + LOG.error("Failed to move: {}", e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } } } } @@ -2862,42 +2891,51 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; /* Move files one by one because source is a subdirectory of destination */ - for (final FileStatus status : srcs) { - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - Path destPath = new Path(destf, status.getPath().getName()); - String group = status.getGroup(); - try { - if(destFs.rename(status.getPath(), destf)) { + for (final FileStatus srcStatus : srcs) { + + if (null == pool) { + if(!destFs.rename(srcStatus.getPath(), destf)) { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest:" + + destf + " returned false"); + } + } else { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + final Path destPath = new Path(destf, srcStatus.getPath().getName()); + final String group = srcStatus.getGroup(); + if(destFs.rename(srcStatus.getPath(), destf)) { if (inheritPerms) { HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destPath, false); } } else { - throw new IOException("rename for src path: " + status.getPath() + " to dest path:" + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:" + destPath + " returned false"); } - } catch (IOException ioe) { - LOG.error("Failed to rename/set permissions. Src path: {} Dest path: {}", status.getPath(), destPath); - throw ioe; + return null; } - return null; - } - })); + })); + } } - pool.shutdown(); - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - LOG.debug(e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + if (null == pool) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, destf, true); + } + } else { + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } } } return true; @@ -3182,28 +3220,33 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean result = true; final List> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { - futures.add(pool.submit(new Callable() { - - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf); - } - })); + if (null == pool) { + result &= FileUtils.moveToTrash(fs, status.getPath(), conf); + } else { + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf); + } + })); + } } - pool.shutdown(); - for (Future future : futures) { - try { - result &= future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to delete: ",e); - pool.shutdownNow(); - throw new IOException(e); + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete: ",e); + pool.shutdownNow(); + throw new IOException(e); + } } } return result;