Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5973B200C36 for ; Fri, 10 Mar 2017 23:23:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 57EBB160B82; Fri, 10 Mar 2017 22:23:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 59FB7160B67 for ; Fri, 10 Mar 2017 23:23:37 +0100 (CET) Received: (qmail 83378 invoked by uid 500); 10 Mar 2017 22:23:36 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 83366 invoked by uid 99); 10 Mar 2017 22:23:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Mar 2017 22:23:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69D88DFDC8; Fri, 10 Mar 2017 22:23:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: spena@apache.org To: commits@hive.apache.org Message-Id: <2152a92abfda424895808dca16c13c4c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-14864: Distcp is not called from MoveTask when src is a directory (Sahil Takiar, reviewed by Sergio Pena) Date: Fri, 10 Mar 2017 22:23:36 +0000 (UTC) archived-at: Fri, 10 Mar 2017 22:23:38 -0000 Repository: hive Updated Branches: refs/heads/master f44bf6fe4 -> 9d3c33b11 HIVE-14864: Distcp is not called from MoveTask when src is a directory (Sahil Takiar, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d3c33b1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d3c33b1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d3c33b1 Branch: refs/heads/master Commit: 9d3c33b11f9cb728667c690bb7b324d897a0951f Parents: f44bf6f Author: Sahil Takiar Authored: Fri Mar 10 16:23:12 2017 -0600 Committer: Sergio Pena Committed: Fri Mar 10 16:23:12 2017 -0600 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 42 ++++++-- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../hadoop/hive/common/TestFileUtils.java | 32 ++++++ itests/hive-unit-hadoop2/pom.xml | 5 + .../hadoop/hive/common/TestFileUtils.java | 103 +++++++++++++++++++ 5 files changed, 176 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 9e07c08..9a0521c 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -32,7 +32,10 @@ import java.util.List; import java.util.Random; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -563,21 +566,38 @@ public final class FileUtils { boolean deleteSource, boolean overwrite, HiveConf conf) throws IOException { + return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims()); + } - HadoopShims shims = ShimLoader.getHadoopShims(); - boolean copied; + @VisibleForTesting + static boolean copy(FileSystem srcFS, Path src, + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + HiveConf conf, HadoopShims shims) throws IOException { + + boolean copied = false; + boolean triedDistcp = false; /* Run distcp if source file/dir is too big */ - if (srcFS.getUri().getScheme().equals("hdfs") && - srcFS.getFileStatus(src).getLen() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { - LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. (MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); - LOG.info("Launch distributed copy (distcp) job."); - HiveConfUtil.updateJobCredentialProviders(conf); - copied = shims.runDistCp(src, dst, conf); - if (copied && deleteSource) { - srcFS.delete(src, true); + if (srcFS.getUri().getScheme().equals("hdfs")) { + ContentSummary srcContentSummary = srcFS.getContentSummary(src); + if (srcContentSummary.getFileCount() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + && srcContentSummary.getLength() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { + + LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); + LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " + conf.getLongVar( + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); + LOG.info("Launch distributed copy (distcp) job."); + triedDistcp = true; + copied = shims.runDistCp(src, dst, conf); + if (copied && deleteSource) { + srcFS.delete(src, true); + } } - } else { + } + if (!triedDistcp) { copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a479deb..f68bd35 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1162,9 +1162,12 @@ public class HiveConf extends Configuration { HIVE_GROUPBY_LIMIT_EXTRASTEP("hive.groupby.limit.extrastep", true, "This parameter decides if Hive should \n" + "create new MR job for sorting final output"), - // Max filesize used to do a single copy (after that, distcp is used) + // Max file num and size used to do a single copy (after that, distcp is used) + HIVE_EXEC_COPYFILE_MAXNUMFILES("hive.exec.copyfile.maxnumfiles", 1L, + "Maximum number of files Hive uses to do sequential HDFS copies between directories." + + "Distributed copies (distcp) will be used instead for larger numbers of files so that copies can be done faster."), HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, - "Maximum file size (in Mb) that Hive uses to do single HDFS copies between directories." + + "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."), // for hive udtf operator http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java index 5705028..03fcaeb 100644 --- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java +++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java @@ -22,18 +22,28 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.HadoopShims; + import org.junit.Assert; import org.junit.Test; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,4 +207,26 @@ public class TestFileUtils { boolean result = parents.contains(key); assertEquals("key=" + key, expected, result); } + + @Test + public void testCopyWithDistcp() throws IOException { + Path copySrc = new Path("copySrc"); + Path copyDst = new Path("copyDst"); + HiveConf conf = new HiveConf(TestFileUtils.class); + conf.set(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname, "false"); + + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.getUri()).thenReturn(URI.create("hdfs:///")); + + ContentSummary mockContentSummary = mock(ContentSummary.class); + when(mockContentSummary.getFileCount()).thenReturn(Long.MAX_VALUE); + when(mockContentSummary.getLength()).thenReturn(Long.MAX_VALUE); + when(mockFs.getContentSummary(any(Path.class))).thenReturn(mockContentSummary); + + HadoopShims shims = mock(HadoopShims.class); + when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(true); + + Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf, shims)); + verify(shims).runDistCp(copySrc, copyDst, conf); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/itests/hive-unit-hadoop2/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml index 44135d6..d15bd54 100644 --- a/itests/hive-unit-hadoop2/pom.xml +++ b/itests/hive-unit-hadoop2/pom.xml @@ -152,6 +152,11 @@ org.apache.hadoop + hadoop-distcp + ${hadoop.version} + + + org.apache.hadoop hadoop-hdfs ${hadoop.version} tests http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java new file mode 100644 index 0000000..f143315 --- /dev/null +++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.hive.common; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Integration tests for {{@link FileUtils}. Tests run against a {@link HadoopShims.MiniDFSShim}. + */ +public class TestFileUtils { + + private static final Path basePath = new Path("/tmp/"); + + private static HiveConf conf; + private static FileSystem fs; + private static HadoopShims.MiniDFSShim dfs; + + @BeforeClass + public static void setup() throws Exception { + conf = new HiveConf(TestFileUtils.class); + dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null); + fs = dfs.getFileSystem(); + } + + @Test + public void testCopySingleEmptyFile() throws IOException { + String file1Name = "file1.txt"; + Path copySrc = new Path(basePath, "copySrc"); + Path copyDst = new Path(basePath, "copyDst"); + try { + fs.create(new Path(basePath, new Path(copySrc, file1Name))).close(); + Assert.assertTrue("FileUtils.copy failed to copy data", + FileUtils.copy(fs, copySrc, fs, copyDst, false, false, conf)); + + Path dstFileName1 = new Path(copyDst, file1Name); + Assert.assertTrue(fs.exists(new Path(copyDst, file1Name))); + Assert.assertEquals(fs.getFileStatus(dstFileName1).getLen(), 0); + } finally { + try { + fs.delete(copySrc, true); + fs.delete(copyDst, true); + } catch (IOException e) { + // Do nothing + } + } + } + + @Test + public void testCopyWithDistcp() throws IOException { + String file1Name = "file1.txt"; + String file2Name = "file2.txt"; + Path copySrc = new Path(basePath, "copySrc"); + Path copyDst = new Path(basePath, "copyDst"); + Path srcFile1 = new Path(basePath, new Path(copySrc, file1Name)); + Path srcFile2 = new Path(basePath, new Path(copySrc, file2Name)); + try { + OutputStream os1 = fs.create(srcFile1); + os1.write(new byte[]{1, 2, 3}); + os1.close(); + + OutputStream os2 = fs.create(srcFile2); + os2.write(new byte[]{1, 2, 3}); + os2.close(); + + conf.set(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, "1"); + conf.set(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1"); + Assert.assertTrue("FileUtils.copy failed to copy data", + FileUtils.copy(fs, copySrc, fs, copyDst, false, false, conf)); + + Path dstFileName1 = new Path(copyDst, file1Name); + Assert.assertTrue(fs.exists(new Path(copyDst, file1Name))); + Assert.assertEquals(fs.getFileStatus(dstFileName1).getLen(), 3); + + Path dstFileName2 = new Path(copyDst, file2Name); + Assert.assertTrue(fs.exists(new Path(copyDst, file2Name))); + Assert.assertEquals(fs.getFileStatus(dstFileName2).getLen(), 3); + } finally { + try { + fs.delete(copySrc, true); + fs.delete(copyDst, true); + } catch (IOException e) { + // Do nothing + } + } + } + + @AfterClass + public static void shutdown() throws IOException { + fs.close(); + dfs.shutdown(); + } +}