hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject hive git commit: HIVE-16901: Distcp optimization - One distcp per ReplCopyTask (Sankar Hariappan, reviewed by Anishek Agarwal, Daniel Dai)
Date Wed, 05 Jul 2017 05:51:48 GMT
Repository: hive
Updated Branches:
  refs/heads/master d68630b6e -> 91919d3f7


HIVE-16901: Distcp optimization - One distcp per ReplCopyTask (Sankar Hariappan, reviewed
by Anishek Agarwal, Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91919d3f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91919d3f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91919d3f

Branch: refs/heads/master
Commit: 91919d3f72bb0a2fdbaf3649dca830354fdf66b1
Parents: d68630b
Author: Daniel Dai <daijy@hortonworks.com>
Authored: Tue Jul 4 22:51:29 2017 -0700
Committer: Daniel Dai <daijy@hortonworks.com>
Committed: Tue Jul 4 22:51:29 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    | 20 +++---
 .../hadoop/hive/common/TestFileUtils.java       | 17 ++---
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 73 +++++++++++++-------
 .../ql/parse/repl/dump/io/FileOperations.java   |  8 ++-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java | 18 ++---
 .../hadoop/hive/shims/TestHadoop23Shims.java    |  5 +-
 .../apache/hadoop/hive/shims/HadoopShims.java   | 10 ++-
 7 files changed, 90 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index c0388f6..e8a3a7a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
@@ -48,13 +49,10 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConfUtil;
-import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
@@ -594,10 +592,10 @@ public final class FileUtils {
    * Copies files between filesystems as a fs super user using distcp, and runs
    * as a privileged user.
    */
-  public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst,
+  public static boolean privilegedCopy(FileSystem srcFS, List<Path> srcPaths, Path
dst,
       HiveConf conf) throws IOException {
     String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-    return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims());
+    return distCp(srcFS, srcPaths, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims());
   }
 
   @VisibleForTesting
@@ -622,7 +620,7 @@ public final class FileUtils {
                 HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")");
         LOG.info("Launch distributed copy (distcp) job.");
         triedDistcp = true;
-        copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims);
+        copied = distCp(srcFS, Collections.singletonList(src), dst, deleteSource, null, conf,
shims);
       }
     }
     if (!triedDistcp) {
@@ -635,17 +633,19 @@ public final class FileUtils {
     return copied;
   }
 
-  static boolean distCp(FileSystem srcFS, Path src, Path dst,
+  public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
       boolean deleteSource, String doAsUser,
       HiveConf conf, HadoopShims shims) throws IOException {
     boolean copied = false;
     if (doAsUser == null){
-      copied = shims.runDistCp(src, dst, conf);
+      copied = shims.runDistCp(srcPaths, dst, conf);
     } else {
-      copied = shims.runDistCpAs(src, dst, conf, doAsUser);
+      copied = shims.runDistCpAs(srcPaths, dst, conf, doAsUser);
     }
     if (copied && deleteSource) {
-      srcFS.delete(src, true);
+      for (Path path : srcPaths) {
+        srcFS.delete(path, true);
+      }
     }
     return copied;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
index d3c8761..3cdca20 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -223,10 +224,10 @@ public class TestFileUtils {
     when(mockFs.getContentSummary(any(Path.class))).thenReturn(mockContentSummary);
 
     HadoopShims shims = mock(HadoopShims.class);
-    when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(true);
+    when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(true);
 
     Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf,
shims));
-    verify(shims).runDistCp(copySrc, copyDst, conf);
+    verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf);
   }
 
   @Test
@@ -240,14 +241,14 @@ public class TestFileUtils {
     String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
 
     HadoopShims shims = mock(HadoopShims.class);
-    when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true);
-    when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false);
+    when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser)).thenReturn(true);
+    when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(false);
 
     // doAs when asked
-    Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims));
-    verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser);
+    Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true,
doAsUser, conf, shims));
+    verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser);
     // don't doAs when not asked
-    Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims));
-    verify(shims).runDistCp(copySrc, copyDst, conf);
+    Assert.assertFalse(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst,
true, null, conf, shims));
+    verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 8e7704d..7330f56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -33,6 +33,7 @@ import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -98,7 +99,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable
{
       List<FileStatus> srcFiles = new ArrayList<>();
       FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
       LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
-      if (! rwork.getReadListFromInput()){
+      if (!rwork.getReadListFromInput()) {
         if (srcs == null || srcs.length == 0) {
           if (work.isErrorOnSrcEmpty()) {
             console.printError("No files matching path: " + fromPath.toString());
@@ -132,7 +133,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
       }
 
       BufferedWriter listBW = null;
-      if (rwork.getListFilesOnOutputBehaviour()){
+      if (rwork.getListFilesOnOutputBehaviour()) {
         Path listPath = new Path(toPath,EximUtil.FILES_NAME);
         LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
         if (dstFs.exists(listPath)){
@@ -144,27 +145,33 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
         // later(for cases where filenames have unicode chars)
       }
 
+      HashMap<FileSystem, List<Path>> srcMap = new HashMap<>();
       for (FileStatus oneSrc : srcFiles) {
         console.printInfo("Copying file: " + oneSrc.getPath().toString());
         LOG.debug("Copying file: " + oneSrc.getPath().toString());
 
         FileSystem actualSrcFs = null;
-        if (rwork.getReadListFromInput()){
+        if (rwork.getReadListFromInput()) {
           // TODO : filesystemcache prevents this from being a perf nightmare, but we
           // should still probably follow up to see if we need to do something better here.
           actualSrcFs = oneSrc.getPath().getFileSystem(conf);
         } else {
           actualSrcFs = srcFs;
         }
-        if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
 
+        if (!rwork.getListFilesOnOutputBehaviour(oneSrc)) {
           LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
-          if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs, conf)) {
-          console.printError("Failed to copy: '" + oneSrc.getPath().toString()
-              + "to: '" + toPath.toString() + "'");
-          return 1;
+
+          // We just make the list of files to copied using distCp.
+          // If files come from different file system, then just make separate lists for
each filesystem.
+          if (srcMap.containsKey(actualSrcFs)) {
+            srcMap.get(actualSrcFs).add(oneSrc.getPath());
+          } else {
+            List<Path> srcPaths = new ArrayList<>();
+            srcPaths.add(oneSrc.getPath());
+            srcMap.put(actualSrcFs, srcPaths);
           }
-        }else{
+        } else {
           LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
           console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
           String chksumString = ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs);
@@ -177,8 +184,21 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
         listBW.close();
       }
 
-      return 0;
+      // If the srcMap is not empty which means we made the list of files for distCp.
+      // If there are files from different filesystems, then the map will have multiple entries.
+      if (!srcMap.isEmpty()) {
+        for (final HashMap.Entry<FileSystem, List<Path>> entry : srcMap.entrySet())
{
+          FileSystem actualSrcFs = entry.getKey();
+          List<Path> srcPaths = entry.getValue();
+          if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) {
+            console.printError("Failed to copy: " + srcPaths.size()
+                    + " files to: '" + toPath.toString() + "'");
+            return 1;
+          }
+        }
+      }
 
+      return 0;
     } catch (Exception e) {
       console.printError("Failed with exception " + e.getMessage(), "\n"
           + StringUtils.stringifyException(e));
@@ -186,26 +206,31 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
     }
   }
 
-  public static boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs,
-      HiveConf conf) throws IOException {
+  public static boolean doCopy(Path dst, FileSystem dstFs, List<Path> srcPaths, FileSystem
srcFs,
+              HiveConf conf) throws IOException {
+    boolean result = true;
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)
-        || isLocalFile(src) || isLocalFile(dst)){
-      // regular copy in test env, or when source or destination is a local file
-      // distcp runs inside a mapper task, and cannot handle file:///
-      LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri());
-      return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf);
+            || isLocalFileSystem(dstFs) || isLocalFileSystem(srcFs)) {
+      for (final Path src : srcPaths) {
+        // regular copy in test env, or when source or destination is a local file
+        // distcp runs inside a mapper task, and cannot handle file:///
+        LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri());
+        if (!FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf)) {
+          result = false;
+        }
+      }
     } else {
       // distcp in actual deployment with privilege escalation
-      LOG.debug("Using privleged distcp for {} -> {}", src.toUri(), dst.toUri());
-      return FileUtils.privilegedCopy(srcFs, src, dst, conf);
+      result = FileUtils.privilegedCopy(srcFs, srcPaths, dst, conf);
     }
+    return result;
   }
 
-  private static boolean isLocalFile(Path p) {
-    String scheme = p.toUri().getScheme();
-    boolean isLocalFile = scheme.equalsIgnoreCase("file");
-    LOG.debug("{} was a local file? {}, had scheme {}",p.toUri(), isLocalFile, scheme);
-    return isLocalFile;
+  private static boolean isLocalFileSystem(FileSystem fs) {
+    String scheme = fs.getScheme();
+    boolean isLocalFileSystem = scheme.equalsIgnoreCase("file");
+    LOG.debug("Scheme {} was a local file system? {}", scheme, isLocalFileSystem);
+    return isLocalFileSystem;
   }
 
   private List<FileStatus> filesInFileListing(FileSystem fs, Path path)

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 61e004f..164ca74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
 
 public class FileOperations {
   private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
@@ -64,10 +66,12 @@ public class FileOperations {
   private void copyFiles() throws IOException {
     FileStatus[] fileStatuses =
         LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath);
+    List<Path> srcPaths = new ArrayList<>();
     for (FileStatus fileStatus : fileStatuses) {
-      ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, fileStatus.getPath(), dataFileSystem,
-          hiveConf);
+      srcPaths.add(fileStatus.getPath());
     }
+
+    ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, srcPaths, dataFileSystem, hiveConf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index e3d1199..a2e0abd 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -29,7 +29,6 @@ import java.security.AccessControlException;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,7 +37,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.security.auth.Subject;
-import javax.security.auth.login.LoginException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -1086,7 +1084,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
 
   private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
 
-  List<String> constructDistCpParams(Path src, Path dst, Configuration conf) {
+  List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration
conf) {
     List<String> params = new ArrayList<String>();
     for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
       String distCpOption = entry.getKey();
@@ -1102,20 +1100,22 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       params.add("-skipcrccheck");
       params.add("-pb");
     }
-    params.add(src.toString());
+    for (Path src : srcPaths) {
+      params.add(src.toString());
+    }
     params.add(dst.toString());
     return params;
   }
 
   @Override
-  public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws
IOException {
+  public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String
doAsUser) throws IOException {
     UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
         doAsUser, UserGroupInformation.getLoginUser());
     try {
       return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
         @Override
         public Boolean run() throws Exception {
-          return runDistCp(src, dst, conf);
+          return runDistCp(srcPaths, dst, conf);
         }
       });
     } catch (InterruptedException e) {
@@ -1124,14 +1124,14 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   }
 
   @Override
-  public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
-    DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst);
+  public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) throws
IOException {
+    DistCpOptions options = new DistCpOptions(srcPaths, dst);
     options.setSyncFolder(true);
     options.setSkipCRC(true);
     options.preserve(FileAttribute.BLOCKSIZE);
 
     // Creates the command-line parameters for distcp
-    List<String> params = constructDistCpParams(src, dst, conf);
+    List<String> params = constructDistCpParams(srcPaths, dst, conf);
 
     try {
       conf.setBoolean("mapred.mapper.new-api", true);

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
index 6c93df5..a73dc77 100644
--- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.tools.DistCpOptions;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -46,7 +47,7 @@ public class TestHadoop23Shims {
     Configuration conf = new Configuration();
 
     Hadoop23Shims shims = new Hadoop23Shims();
-    List<String> paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf);
+    List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc),
copyDst, conf);
 
     assertEquals(5, paramsDefault.size());
     assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
@@ -59,7 +60,7 @@ public class TestHadoop23Shims {
     conf.set("distcp.options.blah", ""); // should set "-blah"
     conf.set("dummy", "option"); // should be ignored.
     List<String> paramsWithCustomParamInjection =
-        shims.constructDistCpParams(copySrc, copyDst, conf);
+        shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
 
     assertEquals(5, paramsWithCustomParamInjection.size());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/91919d3f/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index d08ad04..0db54d1 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.shims;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -43,7 +42,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobProfile;
@@ -482,25 +480,25 @@ public interface HadoopShims {
    * copy is done. This is a variation which allows proxying as a different user to perform
    * the distcp, and requires that the caller have requisite proxy user privileges.
    *
-   * @param src Path to the source file or directory to copy
+   * @param srcPaths List of Path to the source files or directories to copy
    * @param dst Path to the destination file or directory
    * @param conf The hadoop configuration object
    * @param doAsUser The user to perform the distcp as
    * @return True if it is successfull; False otherwise.
    */
-  public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws
IOException;
+  public boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf, String
doAsUser) throws IOException;
 
   /**
    * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes.
    * This distributed process is meant to copy huge files that could take some time if a
single
    * copy is done.
    *
-   * @param src Path to the source file or directory to copy
+   * @param srcPaths List of Path to the source files or directories to copy
    * @param dst Path to the destination file or directory
    * @param conf The hadoop configuration object
    * @return True if it is successfull; False otherwise.
    */
-  public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException;
+  public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) throws
IOException;
 
   /**
    * This interface encapsulates methods used to get encryption information from


Mime
View raw message