hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sp...@apache.org
Subject hive git commit: HIVE-14864: Distcp is not called from MoveTask when src is a directory (Sahil Takiar, reviewed by Sergio Pena)
Date Fri, 10 Mar 2017 22:23:36 GMT
Repository: hive
Updated Branches:
  refs/heads/master f44bf6fe4 -> 9d3c33b11


HIVE-14864: Distcp is not called from MoveTask when src is a directory (Sahil Takiar, reviewed
by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d3c33b1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d3c33b1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d3c33b1

Branch: refs/heads/master
Commit: 9d3c33b11f9cb728667c690bb7b324d897a0951f
Parents: f44bf6f
Author: Sahil Takiar <takiar.sahil@gmail.com>
Authored: Fri Mar 10 16:23:12 2017 -0600
Committer: Sergio Pena <sergio.pena@cloudera.com>
Committed: Fri Mar 10 16:23:12 2017 -0600

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |  42 ++++++--
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../hadoop/hive/common/TestFileUtils.java       |  32 ++++++
 itests/hive-unit-hadoop2/pom.xml                |   5 +
 .../hadoop/hive/common/TestFileUtils.java       | 103 +++++++++++++++++++
 5 files changed, 176 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 9e07c08..9a0521c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -32,7 +32,10 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -563,21 +566,38 @@ public final class FileUtils {
     boolean deleteSource,
     boolean overwrite,
     HiveConf conf) throws IOException {
+    return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims());
+  }
 
-    HadoopShims shims = ShimLoader.getHadoopShims();
-    boolean copied;
+  @VisibleForTesting
+  static boolean copy(FileSystem srcFS, Path src,
+    FileSystem dstFS, Path dst,
+    boolean deleteSource,
+    boolean overwrite,
+    HiveConf conf, HadoopShims shims) throws IOException {
+
+    boolean copied = false;
+    boolean triedDistcp = false;
 
     /* Run distcp if source file/dir is too big */
-    if (srcFS.getUri().getScheme().equals("hdfs") &&
-        srcFS.getFileStatus(src).getLen() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE))
{
-      LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. (MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)
+ ")");
-      LOG.info("Launch distributed copy (distcp) job.");
-      HiveConfUtil.updateJobCredentialProviders(conf);
-      copied = shims.runDistCp(src, dst, conf);
-      if (copied && deleteSource) {
-        srcFS.delete(src, true);
+    if (srcFS.getUri().getScheme().equals("hdfs")) {
+      ContentSummary srcContentSummary = srcFS.getContentSummary(src);
+      if (srcContentSummary.getFileCount() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES)
+              && srcContentSummary.getLength() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE))
{
+
+        LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " + conf.getLongVar(
+                HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")");
+        LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " + conf.getLongVar(
+                HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")");
+        LOG.info("Launch distributed copy (distcp) job.");
+        triedDistcp = true;
+        copied = shims.runDistCp(src, dst, conf);
+        if (copied && deleteSource) {
+          srcFS.delete(src, true);
+        }
       }
-    } else {
+    }
+    if (!triedDistcp) {
       copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a479deb..f68bd35 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1162,9 +1162,12 @@ public class HiveConf extends Configuration {
     HIVE_GROUPBY_LIMIT_EXTRASTEP("hive.groupby.limit.extrastep", true, "This parameter decides
if Hive should \n" +
         "create new MR job for sorting final output"),
 
-    // Max filesize used to do a single copy (after that, distcp is used)
+    // Max file num and size used to do a single copy (after that, distcp is used)
+    HIVE_EXEC_COPYFILE_MAXNUMFILES("hive.exec.copyfile.maxnumfiles", 1L,
+        "Maximum number of files Hive uses to do sequential HDFS copies between directories."
+
+        "Distributed copies (distcp) will be used instead for larger numbers of files so
that copies can be done faster."),
     HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/,
-        "Maximum file size (in Mb) that Hive uses to do single HDFS copies between directories."
+
+        "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories."
+
         "Distributed copies (distcp) will be used instead for bigger files so that copies
can be done faster."),
 
     // for hive udtf operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index 5705028..03fcaeb 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -22,18 +22,28 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.HadoopShims;
+
 import org.junit.Assert;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,4 +207,26 @@ public class TestFileUtils {
     boolean result = parents.contains(key);
     assertEquals("key=" + key, expected, result);
   }
+
+  @Test
+  public void testCopyWithDistcp() throws IOException {
+    Path copySrc = new Path("copySrc");
+    Path copyDst = new Path("copyDst");
+    HiveConf conf = new HiveConf(TestFileUtils.class);
+    conf.set(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname, "false");
+
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getUri()).thenReturn(URI.create("hdfs:///"));
+
+    ContentSummary mockContentSummary = mock(ContentSummary.class);
+    when(mockContentSummary.getFileCount()).thenReturn(Long.MAX_VALUE);
+    when(mockContentSummary.getLength()).thenReturn(Long.MAX_VALUE);
+    when(mockFs.getContentSummary(any(Path.class))).thenReturn(mockContentSummary);
+
+    HadoopShims shims = mock(HadoopShims.class);
+    when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(true);
+
+    Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf,
shims));
+    verify(shims).runDistCp(copySrc, copyDst, conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index 44135d6..d15bd54 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -152,6 +152,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <version>${hadoop.version}</version>
       <classifier>tests</classifier>

http://git-wip-us.apache.org/repos/asf/hive/blob/9d3c33b1/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
new file mode 100644
index 0000000..f143315
--- /dev/null
+++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -0,0 +1,103 @@
+package org.apache.hadoop.hive.common;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Integration tests for {{@link FileUtils}. Tests run against a {@link HadoopShims.MiniDFSShim}.
+ */
+public class TestFileUtils {
+
+  private static final Path basePath = new Path("/tmp/");
+
+  private static HiveConf conf;
+  private static FileSystem fs;
+  private static HadoopShims.MiniDFSShim dfs;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new HiveConf(TestFileUtils.class);
+    dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null);
+    fs = dfs.getFileSystem();
+  }
+
+  @Test
+  public void testCopySingleEmptyFile() throws IOException {
+    String file1Name = "file1.txt";
+    Path copySrc = new Path(basePath, "copySrc");
+    Path copyDst = new Path(basePath, "copyDst");
+    try {
+      fs.create(new Path(basePath, new Path(copySrc, file1Name))).close();
+      Assert.assertTrue("FileUtils.copy failed to copy data",
+              FileUtils.copy(fs, copySrc, fs, copyDst, false, false, conf));
+
+      Path dstFileName1 = new Path(copyDst, file1Name);
+      Assert.assertTrue(fs.exists(new Path(copyDst, file1Name)));
+      Assert.assertEquals(fs.getFileStatus(dstFileName1).getLen(), 0);
+    } finally {
+      try {
+        fs.delete(copySrc, true);
+        fs.delete(copyDst, true);
+      } catch (IOException e) {
+        // Do nothing
+      }
+    }
+  }
+
+  @Test
+  public void testCopyWithDistcp() throws IOException {
+    String file1Name = "file1.txt";
+    String file2Name = "file2.txt";
+    Path copySrc = new Path(basePath, "copySrc");
+    Path copyDst = new Path(basePath, "copyDst");
+    Path srcFile1 = new Path(basePath, new Path(copySrc, file1Name));
+    Path srcFile2 = new Path(basePath, new Path(copySrc, file2Name));
+    try {
+      OutputStream os1 = fs.create(srcFile1);
+      os1.write(new byte[]{1, 2, 3});
+      os1.close();
+
+      OutputStream os2 = fs.create(srcFile2);
+      os2.write(new byte[]{1, 2, 3});
+      os2.close();
+
+      conf.set(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, "1");
+      conf.set(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1");
+      Assert.assertTrue("FileUtils.copy failed to copy data",
+              FileUtils.copy(fs, copySrc, fs, copyDst, false, false, conf));
+
+      Path dstFileName1 = new Path(copyDst, file1Name);
+      Assert.assertTrue(fs.exists(new Path(copyDst, file1Name)));
+      Assert.assertEquals(fs.getFileStatus(dstFileName1).getLen(), 3);
+
+      Path dstFileName2 = new Path(copyDst, file2Name);
+      Assert.assertTrue(fs.exists(new Path(copyDst, file2Name)));
+      Assert.assertEquals(fs.getFileStatus(dstFileName2).getLen(), 3);
+    } finally {
+      try {
+        fs.delete(copySrc, true);
+        fs.delete(copyDst, true);
+      } catch (IOException e) {
+        // Do nothing
+      }
+    }
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    fs.close();
+    dfs.shutdown();
+  }
+}


Mime
View raw message