hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject hive git commit: HIVE-16686 : repl invocations of distcp needs additional handling (Sushanth Sowmyan, reviewed by Thejas Nair)
Date Wed, 24 May 2017 16:38:13 GMT
Repository: hive
Updated Branches:
  refs/heads/master dec96ca6f -> 1c891ad4e


HIVE-16686 : repl invocations of distcp needs additional handling (Sushanth Sowmyan, reviewed
by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c891ad4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c891ad4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c891ad4

Branch: refs/heads/master
Commit: 1c891ad4e228aee22aae6c4f8ab572f8867ea441
Parents: dec96ca
Author: Sushanth Sowmyan <khorgath@gmail.com>
Authored: Mon May 22 13:55:20 2017 -0700
Committer: Sushanth Sowmyan <khorgath@gmail.com>
Committed: Wed May 24 09:38:10 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    | 42 +++++++--
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 +
 .../hadoop/hive/common/TestFileUtils.java       | 22 +++++
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 15 ++-
 shims/0.23/pom.xml                              | 10 +-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java | 46 +++++++++-
 .../hadoop/hive/shims/TestHadoop23Shims.java    | 96 ++++++++++++++++++++
 .../apache/hadoop/hive/shims/HadoopShims.java   | 14 +++
 8 files changed, 234 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 985fd8c..c0388f6 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -583,13 +583,23 @@ public final class FileUtils {
    * Copies files between filesystems.
    */
   public static boolean copy(FileSystem srcFS, Path src,
-    FileSystem dstFS, Path dst,
-    boolean deleteSource,
-    boolean overwrite,
-    HiveConf conf) throws IOException {
+      FileSystem dstFS, Path dst,
+      boolean deleteSource,
+      boolean overwrite,
+      HiveConf conf) throws IOException {
     return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims());
   }
 
+  /**
+   * Copies files between filesystems as a fs super user using distcp, and runs
+   * as a privileged user.
+   */
+  public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst,
+      HiveConf conf) throws IOException {
+    String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+    return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims());
+  }
+
   @VisibleForTesting
   static boolean copy(FileSystem srcFS, Path src,
     FileSystem dstFS, Path dst,
@@ -612,18 +622,34 @@ public final class FileUtils {
                 HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")");
         LOG.info("Launch distributed copy (distcp) job.");
         triedDistcp = true;
-        copied = shims.runDistCp(src, dst, conf);
-        if (copied && deleteSource) {
-          srcFS.delete(src, true);
-        }
+        copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims);
       }
     }
     if (!triedDistcp) {
+      // Note : Currently, this implementation does not "fall back" to regular copy if distcp
+      // is tried and it fails. We depend upon that behaviour in cases like replication,
+      // wherein if distcp fails, there is good reason to not plod along with a trivial
+      // implementation, and fail instead.
       copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
     }
     return copied;
   }
 
+  static boolean distCp(FileSystem srcFS, Path src, Path dst,
+      boolean deleteSource, String doAsUser,
+      HiveConf conf, HadoopShims shims) throws IOException {
+    boolean copied = false;
+    if (doAsUser == null){
+      copied = shims.runDistCp(src, dst, conf);
+    } else {
+      copied = shims.runDistCpAs(src, dst, conf, doAsUser);
+    }
+    if (copied && deleteSource) {
+      srcFS.delete(src, true);
+    }
+    return copied;
+  }
+
   /**
    * Move a particular file or directory to the trash.
    * @param fs FileSystem to use

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06332ac..2dfc8b6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2611,6 +2611,10 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true,
         "Setting this property to true will have HiveServer2 execute\n" +
         "Hive operations as the user making the calls to it."),
+    HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs",
+        "This property allows privileged distcp executions done by hive\n" +
+        "to run as this user. Typically, it should be the user you\n" +
+        "run the namenode as, such as the 'hdfs' user."),
     HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC", new StringSet("CLASSIC",
"HIVE"),
         "This setting reflects how HiveServer2 will report the table types for JDBC and other\n"
+
         "client implementations that retrieve the available tables and supported table types\n"
+
@@ -3401,6 +3405,7 @@ public class HiveConf extends Configuration {
         "hive.security.authenticator.manager,hive.security.authorization.manager," +
         "hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager,"
+
         "hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled,"
+
+            "hive.distcp.privileged.doAs," +
             "hive.server2.authentication.ldap.baseDN," +
             "hive.server2.authentication.ldap.url," +
             "hive.server2.authentication.ldap.Domain," +

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index adc9b0c..d3c8761 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -228,4 +228,26 @@ public class TestFileUtils {
     Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf,
shims));
     verify(shims).runDistCp(copySrc, copyDst, conf);
   }
+
+  @Test
+  public void testCopyWithDistCpAs() throws IOException {
+    Path copySrc = new Path("copySrc");
+    Path copyDst = new Path("copyDst");
+    HiveConf conf = new HiveConf(TestFileUtils.class);
+
+    FileSystem fs = copySrc.getFileSystem(conf);
+
+    String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+
+    HadoopShims shims = mock(HadoopShims.class);
+    when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true);
+    when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false);
+
+    // doAs when asked
+    Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims));
+    verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser);
+    // don't doAs when not asked
+    Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims));
+    verify(shims).runDistCp(copySrc, copyDst, conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index d2f9e79..f277284 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -139,10 +139,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
         if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
 
           LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
-          if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
-            false, // delete source
-            true, // overwrite destination
-            conf)) {
+          if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs)) {
           console.printError("Failed to copy: '" + oneSrc.getPath().toString()
               + "to: '" + toPath.toString() + "'");
           return 1;
@@ -169,6 +166,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
     }
   }
 
+  private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException
{
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)){
+      // regular copy in test env.
+      return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf);
+    } else {
+      // distcp in actual deployment with privilege escalation
+      return FileUtils.privilegedCopy(srcFs, src, dst, conf);
+    }
+  }
+
 
   private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/pom.xml
----------------------------------------------------------------------
diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml
index 7c586fa..3ff1d38 100644
--- a/shims/0.23/pom.xml
+++ b/shims/0.23/pom.xml
@@ -205,6 +205,14 @@
      <version>${hadoop.version}</version>
      <scope>provided</scope>
    </dependency>
+   <dependency>
+     <groupId>junit</groupId>
+     <artifactId>junit</artifactId>
+     <scope>test</scope>
+   </dependency>
   </dependencies>
-
+  <build>
+    <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/main/test</testSourceDirectory>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 0483e91..4319bed 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -27,6 +27,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,6 +38,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -1081,16 +1084,53 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     }
   }
 
+  private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
+
+  List<String> constructDistCpParams(Path src, Path dst, Configuration conf) {
+    List<String> params = new ArrayList<String>();
+    for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+      String distCpOption = entry.getKey();
+      String distCpVal = entry.getValue();
+      params.add("-" + distCpOption);
+      if ((distCpVal != null) && (!distCpVal.isEmpty())){
+        params.add(distCpVal);
+      }
+    }
+    if (params.size() == 0){
+      // if no entries were added via conf, we initiate our defaults
+      params.add("-update");
+      params.add("-skipcrccheck");
+    }
+    params.add(src.toString());
+    params.add(dst.toString());
+    return params;
+  }
+
   @Override
-  public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
+  public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws
IOException {
+    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+        doAsUser, UserGroupInformation.getLoginUser());
+    try {
+      return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws Exception {
+          return runDistCp(src, dst, conf);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
 
+  @Override
+  public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
     DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst);
     options.setSyncFolder(true);
     options.setSkipCRC(true);
     options.preserve(FileAttribute.BLOCKSIZE);
 
     // Creates the command-line parameters for distcp
-    String[] params = {"-update", "-skipcrccheck", src.toString(), dst.toString()};
+    List<String> params = constructDistCpParams(src, dst, conf);
 
     try {
       conf.setBoolean("mapred.mapper.new-api", true);
@@ -1098,7 +1138,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
 
       // HIVE-13704 states that we should use run() instead of execute() due to a hadoop
known issue
       // added by HADOOP-10459
-      if (distcp.run(params) == 0) {
+      if (distcp.run(params.toArray(new String[0])) == 0) {
         return true;
       } else {
         return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
new file mode 100644
index 0000000..ba1086c
--- /dev/null
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestHadoop23Shims {
+
+  @Test
+  public void testConstructDistCpParams() {
+    Path copySrc = new Path("copySrc");
+    Path copyDst = new Path("copyDst");
+    Configuration conf = new Configuration();
+
+    Hadoop23Shims shims = new Hadoop23Shims();
+    List<String> paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf);
+
+    assertEquals(4, paramsDefault.size());
+    assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+    assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck"));
+    assertEquals(copySrc.toString(), paramsDefault.get(2));
+    assertEquals(copyDst.toString(), paramsDefault.get(3));
+
+    conf.set("distcp.options.foo", "bar"); // should set "-foo bar"
+    conf.set("distcp.options.blah", ""); // should set "-blah"
+    conf.set("dummy", "option"); // should be ignored.
+    List<String> paramsWithCustomParamInjection =
+        shims.constructDistCpParams(copySrc, copyDst, conf);
+
+    assertEquals(5, paramsWithCustomParamInjection.size());
+
+    // check that the defaults did not remain.
+    assertTrue("Distcp -update not set if not requested",
+        !paramsWithCustomParamInjection.contains("-update"));
+    assertTrue("Distcp -skipcrccheck not set if not requested",
+        !paramsWithCustomParamInjection.contains("-skipcrccheck"));
+
+    // the "-foo bar" and "-blah" params order is not guaranteed
+    String firstParam = paramsWithCustomParamInjection.get(0);
+    if (firstParam.equals("-foo")){
+      // "-foo bar -blah"  form
+      assertEquals("bar", paramsWithCustomParamInjection.get(1));
+      assertEquals("-blah", paramsWithCustomParamInjection.get(2));
+    } else {
+      // "-blah -foo bar" form
+      assertEquals("-blah", paramsWithCustomParamInjection.get(0));
+      assertEquals("-foo", paramsWithCustomParamInjection.get(1));
+      assertEquals("bar", paramsWithCustomParamInjection.get(2));
+    }
+
+    // the dummy option should not have made it either - only options
+    // beginning with distcp.options. should be honoured
+    assertTrue(!paramsWithCustomParamInjection.contains("dummy"));
+    assertTrue(!paramsWithCustomParamInjection.contains("-dummy"));
+    assertTrue(!paramsWithCustomParamInjection.contains("option"));
+    assertTrue(!paramsWithCustomParamInjection.contains("-option"));
+
+    assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3));
+    assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1c891ad4/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index c280d49..d08ad04 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -479,6 +479,20 @@ public interface HadoopShims {
   /**
    * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.
    * This distributed process is meant to copy huge files that could take some time if a
single
+   * copy is done. This is a variation which allows proxying as a different user to perform
+   * the distcp, and requires that the caller have requisite proxy user privileges.
+   *
+   * @param src Path to the source file or directory to copy
+   * @param dst Path to the destination file or directory
+   * @param conf The hadoop configuration object
+   * @param doAsUser The user to perform the distcp as
+   * @return True if it is successfull; False otherwise.
+   */
+  public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws
IOException;
+
+  /**
+   * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.
+   * This distributed process is meant to copy huge files that could take some time if a
single
    * copy is done.
    *
    * @param src Path to the source file or directory to copy


Mime
View raw message