hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r627143 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java
Date Tue, 12 Feb 2008 22:01:03 GMT
Author: cdouglas
Date: Tue Feb 12 14:01:01 2008
New Revision: 627143

URL: http://svn.apache.org/viewvc?rev=627143&view=rev
Log:
HADOOP-2725. Modify distcp to avoid leaving partially copied files at
the destination after encountering an error.
Contributed by Tsz Wo (Nicholas), SZE


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Feb 12 14:01:01 2008
@@ -63,6 +63,10 @@
     HADOOP-2733. Fix compiler warnings in test code.
     (Tsz Wo (Nicholas), SZE via cdouglas)
 
+    HADOOP-2725. Modify distcp to avoid leaving partially copied files at
+    the destination after encountering an error. (Tsz Wo (Nicholas), SZE
+    via cdouglas)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Feb 12 14:01:01
2008
@@ -757,7 +757,7 @@
   }
   
   /** Write key/value pairs to a sequence-format file. */
-  public static class Writer {
+  public static class Writer implements java.io.Closeable {
     /**
      * A global compressor pool used to save the expensive 
      * construction/destruction of (possibly native) compression codecs.
@@ -1316,7 +1316,7 @@
   } // BlockCompressionWriter
   
   /** Reads key/value pairs from a sequence-format file. */
-  public static class Reader {
+  public static class Reader implements java.io.Closeable {
     /**
      * A global decompressor pool used to save the expensive 
      * construction/destruction of (possibly native) decompression codecs.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Feb 12 14:01:01 2008
@@ -25,17 +25,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Stack;
-import java.util.StringTokenizer;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,8 +60,10 @@
 public class CopyFiles implements Tool {
   private static final Log LOG = LogFactory.getLog(CopyFiles.class);
 
-  private static final String usage =
-    "distcp [OPTIONS] <srcurl>* <desturl>" +
+  private static final String NAME = "distcp";
+
+  private static final String usage = NAME
+    + " [OPTIONS] <srcurl>* <desturl>" +
     "\n\nOPTIONS:" +
     "\n-p                     Preserve status" +
     "\n-i                     Ignore failures" +
@@ -82,7 +74,7 @@
     "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
     "\n      interpreted as an isomorphic update to an existing directory." +
     "\nFor example:" +
-    "\nhadoop distcp -p -update \"hdfs://A:8020/user/foo/bar\" " +
+    "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
     "\"hdfs://B:8020/user/foo/baz\"\n" +
     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
     "\n     *not* update /user/foo/baz/bar\n";
@@ -92,6 +84,26 @@
   private static final int SYNC_FILE_MAX = 10;
 
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
+  static enum Options {
+    IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
+    PRESERVE_STATUS("-p", NAME + ".preserve.status.info"),
+    OVERWRITE("-overwrite", NAME + ".overwrite.always"),
+    UPDATE("-update", NAME + ".overwrite.ifnewer");
+
+    final String cmd, propertyname;
+
+    private Options(String cmd, String propertyname) {
+      this.cmd = cmd;
+      this.propertyname = propertyname;
+    }
+  }
+
+  static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
+  static final String DST_DIR_LABEL = NAME + ".dest.path";
+  static final String JOB_DIR_LABEL = NAME + ".job.dir";
+  static final String SRC_LIST_LABEL = NAME + ".src.list";
+  static final String SRC_COUNT_LABEL = NAME + ".src.count";
+  static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
 
   private JobConf conf;
 
@@ -107,9 +119,6 @@
     return conf;
   }
 
-  @Deprecated
-  public CopyFiles() { }
-
   public CopyFiles(Configuration conf) {
     setConf(conf);
   }
@@ -139,7 +148,7 @@
    * InputFormat of a distcp job responsible for generating splits of the src
    * file list.
    */
-  static class CopyInputFormat implements InputFormat {
+  static class CopyInputFormat implements InputFormat<Text, Text> {
 
     /**
      * Does nothing.
@@ -154,9 +163,9 @@
      */
     public InputSplit[] getSplits(JobConf job, int numSplits)
         throws IOException {
-      int cnfiles = job.getInt("distcp.file.count", -1);
-      long cbsize = job.getLong("distcp.total.size", -1);
-      String srcfilelist = job.get("distcp.src.list", "");
+      int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
+      long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
+      String srcfilelist = job.get(SRC_LIST_LABEL, "");
       if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
         throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
                                    ") total_size(" + cbsize + ") listuri(" +
@@ -197,23 +206,13 @@
     /**
      * Returns a reader for this split of the src file list.
      */
-    public RecordReader getRecordReader(InputSplit split, JobConf job,
-                                 Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader(job, (FileSplit)split);
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
     }
   }
 
   /**
-   * Return true if dst should be replaced by src and the update flag is set.
-   * Right now, this merely checks that the src and dst len are not equal. This
-   * should be improved on once modification times, CRCs, etc. can
-   * be meaningful in this context.
-   */
-  private static boolean needsUpdate(FileStatus src, FileStatus dst) {
-    return src.getLen() != dst.getLen();
-  }
-
-  /**
    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
    */
   public static class FSCopyFilesMapper
@@ -240,6 +239,15 @@
                     " Failed: " + failcount);
     }
 
+    /**
+     * Return true if dst should be replaced by src and the update flag is set.
+     * Right now, this merely checks that the src and dst len are not equal. 
+     * This should be improved on once modification times, CRCs, etc. can
+     * be meaningful in this context.
+     */
+    private boolean needsUpdate(FileStatus src, FileStatus dst) {
+      return update && src.getLen() != dst.getLen();
+    }
 
     /**
      * Copy a file to a destination.
@@ -247,17 +255,22 @@
      * @param dstpath dst path
      * @param reporter
      */
-    private void copy(FileStatus srcstat, Path dstpath,
+    private void copy(FileStatus srcstat, Path relativedst,
         OutputCollector<WritableComparable, Text> outc, Reporter reporter)
         throws IOException {
-
-      int totfiles = job.getInt("distcp.file.count", -1);
+      Path absdst = new Path(destPath, relativedst);
+      int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
       assert totfiles >= 0 : "Invalid file count " + totfiles;
 
       // if a directory, ensure created even if empty
       if (srcstat.isDir()) {
-        if (!destFileSys.mkdirs(dstpath)) {
-          throw new IOException("Failed to create" + dstpath);
+        if (destFileSys.exists(absdst)) {
+          if (!destFileSys.getFileStatus(absdst).isDir()) {
+            throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
+          }
+        }
+        else if (!destFileSys.mkdirs(absdst)) {
+          throw new IOException("Failed to mkdirs " + absdst);
         }
         // TODO: when modification times can be set, directories should be
         // emitted to reducers so they might be preserved. Also, mkdirs does
@@ -265,70 +278,87 @@
         // if this changes, all directory work might as well be done in reduce
         return;
       }
-      Path destParent = dstpath.getParent();
-      if (totfiles > 1) {
-        // create directories to hold destination file
-        if (destParent != null && !destFileSys.mkdirs(destParent)) {
-          throw new IOException("mkdirs failed to create " + destParent);
-        }
-      } else {
-        // Copying a single file; use dst path provided by user as destination
-        // rather than destination directory
-        dstpath = destParent;
+
+      if (destFileSys.exists(absdst) && !overwrite
+          && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
+        outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
+        ++skipcount;
+        reporter.incrCounter(Counter.SKIP, 1);
+        updateStatus(reporter);
+        return;
       }
 
+      Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
       long cbcopied = 0L;
       FSDataInputStream in = null;
       FSDataOutputStream out = null;
       try {
-        if (destFileSys.exists(dstpath)
-           && (!overwrite && !(update
-               && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
-          outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
-          ++skipcount;
-          reporter.incrCounter(Counter.SKIP, 1);
-          updateStatus(reporter);
-          return;
-        }
         // open src file
         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
-        final long cblen = srcstat.getLen();
-        reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
-        // open dst file
+        reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
+        // open tmp file
         out = preserve_status
-          ? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
+          ? destFileSys.create(tmpfile, true, sizeBuf, srcstat.getReplication(),
              srcstat.getBlockSize(), reporter)
-          : destFileSys.create(dstpath, reporter);
+          : destFileSys.create(tmpfile, reporter);
         // copy file
         int cbread;
         while ((cbread = in.read(buffer)) >= 0) {
           out.write(buffer, 0, cbread);
           cbcopied += cbread;
-          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
-              " " + dstpath + " [ " +
+          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / srcstat.getLen())
+              + " " + absdst + " [ " +
               StringUtils.humanReadableInt(cbcopied) + " / " +
-              StringUtils.humanReadableInt(cblen) + " ]");
-        }
-        if (cbcopied != cblen) {
-          final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
-              StringUtils.humanReadableInt(cbcopied) + ") expected " +
-              cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
-              ") from " + srcstat.getPath();
-          LOG.warn(badlen);
-          outc.collect(null, new Text(badlen));
+              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
         }
       } finally {
-        if (in != null)
-          in.close();
-        if (out != null)
-          out.close();
+        checkAndClose(in);
+        checkAndClose(out);
       }
+
+      final boolean success = cbcopied == srcstat.getLen();
+      if (!success) {
+        final String badlen = "ERROR? copied " + bytesString(cbcopied)
+            + " but expected " + bytesString(srcstat.getLen()) 
+            + " from " + srcstat.getPath();
+        LOG.warn(badlen);
+        outc.collect(null, new Text(badlen));
+      }
+      else {
+        if (totfiles == 1) {
+          // Copying a single file; use dst path provided by user as destination
+          // rather than destination directory
+          absdst = absdst.getParent();
+        }
+        rename(destFileSys, tmpfile, absdst);
+      }
+
       // report at least once for each file
       ++copycount;
       reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
       reporter.incrCounter(Counter.COPY, 1);
       updateStatus(reporter);
     }
+    
+    /** rename tmp to dst, delete dst if already exists */
+    private void rename(FileSystem fs, Path tmp, Path dst) throws IOException {
+      try {
+        if (fs.exists(dst)) {
+          fs.delete(dst);
+        }
+        fs.rename(tmp, dst);
+      }
+      catch(IOException cause) {
+        IOException ioe = new IOException("Fail to rename tmp file (=" + tmp 
+            + ") to destination file (=" + dst + ")");
+        ioe.initCause(cause);
+        throw ioe;
+      }
+    }
+    
+    static String bytesString(long b) {
+      return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
+    }
 
     /** Mapper configuration.
      * Extracts source and destination file system, as well as
@@ -337,7 +367,7 @@
      */
     public void configure(JobConf job)
     {
-      destPath = new Path(job.get("copy.dest.path", "/"));
+      destPath = new Path(job.get(DST_DIR_LABEL, "/"));
       try {
         destFileSys = destPath.getFileSystem(job);
       } catch (IOException ex) {
@@ -345,10 +375,10 @@
       }
       sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
       buffer = new byte[sizeBuf];
-      ignoreReadFailures = job.getBoolean("distcp.ignore.read.failures", false);
-      preserve_status = job.getBoolean("distcp.preserve.status.info", false);
-      update = job.getBoolean("distcp.overwrite.ifnewer", false);
-      overwrite = !update && job.getBoolean("distcp.overwrite.always", false);
+      ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
+      preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
+      update = job.getBoolean(Options.UPDATE.propertyname, false);
+      overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
       this.job = job;
     }
 
@@ -438,27 +468,15 @@
     } else {
       tmp.add(src);
     }
-    EnumSet<cpOpts> flags = ignoreReadFailures
-      ? EnumSet.of(cpOpts.IGNORE_READ_FAILURES)
-      : EnumSet.noneOf(cpOpts.class);
+    EnumSet<Options> flags = ignoreReadFailures
+      ? EnumSet.of(Options.IGNORE_READ_FAILURES)
+      : EnumSet.noneOf(Options.class);
     copy(conf, tmp, new Path(destPath), logPath, flags);
   }
 
-  /**
-   * Driver to copy srcPath to destPath depending on required protocol.
-   * @param srcPaths list of source paths
-   * @param destPath Destination path
-   * @param logPath Log output directory
-   * @param flags Command-line flags
-   */
-  public static void copy(Configuration conf, List<Path> srcPaths,
-      Path destPath, Path logPath,
-      EnumSet<cpOpts> flags) throws IOException {
-    //Job configuration
-    JobConf job = new JobConf(conf, CopyFiles.class);
-    job.setJobName("distcp");
-
-    //Sanity check for srcPath/destPath
+  /** Sanity check for srcPath */
+  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
+      ) throws IOException {
     List<IOException> rslt = new ArrayList<IOException>();
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
@@ -469,21 +487,35 @@
     if (!rslt.isEmpty()) {
       throw new InvalidInputException(rslt);
     }
+  }
 
+  /**
+   * Driver to copy srcPath to destPath depending on required protocol.
+   * @param srcPaths list of source paths
+   * @param destPath Destination path
+   * @param logPath Log output directory
+   * @param flags Command-line flags
+   */
+  public static void copy(Configuration conf, List<Path> srcPaths,
+      Path destPath, Path logPath,
+      EnumSet<Options> flags) throws IOException {
+    LOG.info("srcPaths=" + srcPaths);
+    LOG.info("destPath=" + destPath);
+    checkSrcPath(conf, srcPaths);
+
+    JobConf job = createJobConf(conf);
     //Initialize the mapper
     try {
       setup(conf, job, srcPaths, destPath, logPath, flags);
       JobClient.runJob(job);
     } finally {
-      cleanup(conf, job);
+      //delete tmp
+      fullyDelete(job.get(TMP_DIR_LABEL), job);
+      //delete jobDirectory
+      fullyDelete(job.get(JOB_DIR_LABEL), job);
     }
   }
 
-  enum cpOpts { IGNORE_READ_FAILURES,
-                PRESERVE_STATUS,
-                OVERWRITE,
-                UPDATE }
-
   /**
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
@@ -496,25 +528,23 @@
     List<Path> srcPath = new ArrayList<Path>();
     Path destPath = null;
     Path logPath = null;
-    EnumSet<cpOpts> flags = EnumSet.noneOf(cpOpts.class);
+    EnumSet<Options> flags = EnumSet.noneOf(Options.class);
 
     for (int idx = 0; idx < args.length; idx++) {
-      if ("-i".equals(args[idx])) {
-        flags.add(cpOpts.IGNORE_READ_FAILURES);
-      } else if ("-p".equals(args[idx])) {
-        flags.add(cpOpts.PRESERVE_STATUS);
-      } else if ("-overwrite".equals(args[idx])) {
-        flags.add(cpOpts.OVERWRITE);
-      } else if ("-update".equals(args[idx])) {
-        flags.add(cpOpts.UPDATE);
-      } else if ("-f".equals(args[idx])) {
+      Options[] opt = Options.values();
+      int i = 0;
+      for(; i < opt.length && !opt[i].cmd.equals(args[idx]); i++);
+
+      if (i < opt.length) {
+        flags.add(opt[i]);
+      }        
+      else if ("-f".equals(args[idx])) {
         if (++idx ==  args.length) {
           System.out.println("urilist_uri not specified");
           System.out.println(usage);
           return -1;
         }
         srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
-
       } else if ("-log".equals(args[idx])) {
         if (++idx ==  args.length) {
           System.out.println("logdir not specified");
@@ -541,13 +571,16 @@
       return -1;
     }
     // incompatible command-line flags
-    if (flags.contains(cpOpts.OVERWRITE) && flags.contains(cpOpts.UPDATE)) {
+    if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
       System.out.println("Conflicting overwrite policies");
       System.out.println(usage);
       return -1;
     }
     try {
       copy(conf, srcPath, destPath, logPath, flags);
+    } catch (DuplicationException e) {
+      System.err.println(StringUtils.stringifyException(e));
+      return DuplicationException.ERROR_CODE;
     } catch (Exception e) {
       System.err.println("With failures, global counters are inaccurate; " +
           "consider running with -i");
@@ -607,22 +640,50 @@
     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
     return Math.max(numMaps, 1);
   }
-  /**
-   * Delete the temporary dir containing the src file list.
-   * @param conf The dfs/mapred configuration
-   * @param jobConf The handle to the jobConf object
-   */
-  private static void cleanup(Configuration conf, JobConf jobConf)
-      throws IOException {
-    //Clean up jobDirectory
-    String jobDirName = jobConf.get("distdp.job.dir");
-    if (jobDirName != null) {
-      Path jobDirectory = new Path(jobDirName);
-      FileSystem fs = jobDirectory.getFileSystem(jobConf);
-      FileUtil.fullyDelete(fs, jobDirectory);
+
+  /** Fully delete dir */
+  static void fullyDelete(String dir, Configuration conf) throws IOException {
+    if (dir != null) {
+      Path tmp = new Path(dir);
+      FileUtil.fullyDelete(tmp.getFileSystem(conf), tmp);
     }
   }
 
+  //Job configuration
+  private static JobConf createJobConf(Configuration conf) {
+    JobConf jobconf = new JobConf(conf, CopyFiles.class);
+    jobconf.setJobName(NAME);
+
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobconf.setMapSpeculativeExecution(false);
+
+    jobconf.setInputFormat(CopyInputFormat.class);
+    jobconf.setOutputKeyClass(Text.class);
+    jobconf.setOutputValueClass(Text.class);
+
+    jobconf.setMapperClass(FSCopyFilesMapper.class);
+    jobconf.setNumReduceTasks(0);
+    return jobconf;
+  }
+
+  private static final Random RANDOM = new Random();
+  private static String getRandomId() {
+    return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+  }
+
+  private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
+    boolean update = flags.contains(Options.UPDATE);
+    boolean overwrite = !update && flags.contains(Options.OVERWRITE);
+    jobConf.setBoolean(Options.UPDATE.propertyname, update);
+    jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
+    jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
+        flags.contains(Options.IGNORE_READ_FAILURES));
+    jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
+        flags.contains(Options.PRESERVE_STATUS));
+    return update || overwrite;
+  }
+
   /**
    * Initialize DFSCopyFileMapper specific job-configuration.
    * @param conf : The dfs/mapred configuration.
@@ -633,46 +694,27 @@
    * @param flags : Command-line flags
    */
   private static void setup(Configuration conf, JobConf jobConf,
-                            List<Path> srcPaths, Path destPath,
-                            Path logPath, EnumSet<cpOpts> flags)
+                            List<Path> srcPaths, final Path destPath,
+                            Path logPath, EnumSet<Options> flags)
       throws IOException {
-    boolean update;
-    boolean overwrite;
-    jobConf.set("copy.dest.path", destPath.toUri().toString());
-
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
-
-    jobConf.setInputFormat(CopyInputFormat.class);
+    jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
+    final boolean updateORoverwrite = setBooleans(jobConf, flags);
 
-    jobConf.setOutputKeyClass(Text.class);
-    jobConf.setOutputValueClass(Text.class);
+    final String randomId = getRandomId();
+    Path jobDirectory = new Path(jobConf.getSystemDir(), NAME + "_" + randomId);
+    jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
-    jobConf.setMapperClass(FSCopyFilesMapper.class);
-
-    jobConf.setNumReduceTasks(0);
-    jobConf.setBoolean("distcp.ignore.read.failures",
-        flags.contains(cpOpts.IGNORE_READ_FAILURES));
-    jobConf.setBoolean("distcp.preserve.status.info",
-        flags.contains(cpOpts.PRESERVE_STATUS));
-    jobConf.setBoolean("distcp.overwrite.ifnewer",
-        update = flags.contains(cpOpts.UPDATE));
-    jobConf.setBoolean("distcp.overwrite.always",
-        overwrite = !update && flags.contains(cpOpts.OVERWRITE));
-
-    Random r = new Random();
-    String randomId = Integer.toString(r.nextInt(Integer.MAX_VALUE), 36);
-    Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
-    jobConf.set("distcp.job.dir", jobDirectory.toString());
-    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
-    jobConf.set("distcp.src.list", srcfilelist.toString());
+    FileSystem dstfs = destPath.getFileSystem(conf);
+    boolean dstExists = dstfs.exists(destPath);
+    boolean dstIsDir = false;
+    if (dstExists) {
+      dstIsDir = dstfs.getFileStatus(destPath).isDir();
+    }
 
     // default logPath
-    FileSystem dstfs = destPath.getFileSystem(conf);
     if (logPath == null) {
       String filename = "_distcp_logs_" + randomId;
-      if (!dstfs.exists(destPath) || !dstfs.getFileStatus(destPath).isDir()) {
+      if (!dstExists || !dstIsDir) {
         Path parent = destPath.getParent();
         dstfs.mkdirs(parent);
         logPath = new Path(parent, filename);
@@ -681,70 +723,139 @@
       }
     }
     jobConf.setOutputPath(logPath);
+    
+    // create src list, dst list
+    FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
+
+    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
+    jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
+    SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
+        srcfilelist, LongWritable.class, FilePair.class,
+        SequenceFile.CompressionType.NONE);
 
-    // create src list
-    SequenceFile.Writer writer = SequenceFile.createWriter(
-        jobDirectory.getFileSystem(jobConf), jobConf, srcfilelist,
-        LongWritable.class, FilePair.class,
+    Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
+    SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
+        dstfilelist, Text.class, Text.class,
         SequenceFile.CompressionType.NONE);
 
-    int cnfiles = 0;
-    long cbsize = 0L;
+    // handle the case where the destination directory doesn't exist
+    // and we've only a single src directory OR we're updating/overwriting
+    // the contents of the destination directory.
+    final boolean special =
+      (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
+    int srcCount = 0, cnsyncf = 0;
+    long cbsize = 0L, cbsyncs = 0L;
     try {
-      // handle the case where the destination directory doesn't exist
-      // and we've only a single src directory OR we're updating/overwriting
-      // the contents of the destination directory.
-      final boolean special_case =
-        (srcPaths.size() == 1 && !dstfs.exists(destPath))
-        || update || overwrite;
-      int cnsyncf = 0;
-      long cbsyncs = 0L;
       for (Path p : srcPaths) {
-        Path root = p.getParent();
         FileSystem fs = p.getFileSystem(conf);
-
-        if (special_case && fs.getFileStatus(p).isDir()) {
-          root = p;
+        boolean pIsDir = fs.getFileStatus(p).isDir();
+        Path root = special && pIsDir? p: p.getParent();
+        if (pIsDir) {
+          ++srcCount;
         }
 
         Stack<Path> pathstack = new Stack<Path>();
         pathstack.push(p);
         while (!pathstack.empty()) {
           for (FileStatus stat : fs.listStatus(pathstack.pop())) {
+            ++srcCount;
+
             if (stat.isDir()) {
               pathstack.push(stat.getPath());
-            } else {
+            }
+            else {
               ++cnsyncf;
               cbsyncs += stat.getLen();
-              ++cnfiles;
               cbsize += stat.getLen();
+
+              if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+                src_writer.sync();
+                dst_writer.sync();
+                cnsyncf = 0;
+                cbsyncs = 0L;
+              }
             }
-            if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
-              writer.sync();
-              cnsyncf = 0;
-              cbsyncs = 0L;
-            }
-            writer.append(new LongWritable(stat.isDir() ? 0 : stat.getLen()),
-                          new FilePair(stat, new Path(destPath,
-                              makeRelative(root, stat.getPath()))));
+
+            Path dst = makeRelative(root, stat.getPath());
+            src_writer.append(new LongWritable(stat.isDir()? 0: stat.getLen()),
+                new FilePair(stat, dst));
+            dst_writer.append(new Text(dst.toString()),
+                new Text(stat.getPath().toString()));
           }
         }
       }
     } finally {
-      writer.close();
+      checkAndClose(src_writer);
+      checkAndClose(dst_writer);
     }
 
     // create dest path dir if copying > 1 file
-    if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
-      throw new IOException("Failed to create" + destPath);
-    }
+    if (!dstfs.exists(destPath)) {
+      if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
+        throw new IOException("Failed to create" + destPath);
+      }
+    }
+    
+    checkDuplication(jobfs, dstfilelist,
+        new Path(jobDirectory, "_distcp_sorted"), conf);
+
+    Path tmpDir = new Path(
+        (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
+        destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
+    jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
+    LOG.info("srcCount=" + srcCount);
+    jobConf.setInt(SRC_COUNT_LABEL, srcCount);
+    jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
+    jobConf.setNumMapTasks(getMapCount(cbsize,
+        new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
+  }
 
-    jobConf.setInt("distcp.file.count", cnfiles);
-    jobConf.setLong("distcp.total.size", cbsize);
+  static private void checkDuplication(FileSystem fs, Path file, Path sorted,
+    Configuration conf) throws IOException {
+    SequenceFile.Reader in = null;
+    try {
+      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+        new Text.Comparator(), Text.class, conf);
+      sorter.sort(file, sorted);
+      in = new SequenceFile.Reader(fs, sorted, conf);
+
+      Text prevdst = null, curdst = new Text();
+      Text prevsrc = null, cursrc = new Text(); 
+      for(; in.next(curdst, cursrc); ) {
+        if (prevdst != null && curdst.equals(prevdst)) {
+          throw new DuplicationException(
+            "Invalid input, there are duplicated files in the sources: "
+            + prevsrc + ", " + cursrc);
+        }
+        prevdst = curdst;
+        curdst = new Text();
+        prevsrc = cursrc;
+        cursrc = new Text();
+      }
+    }
+    finally {
+      checkAndClose(in);
+    }
+  } 
 
-    JobClient client = new JobClient(jobConf);
-    jobConf.setNumMapTasks(getMapCount(cbsize,
-          client.getClusterStatus().getTaskTrackers()));
+  static boolean checkAndClose(java.io.Closeable io) {
+    if (io != null) {
+      try {
+        io.close();
+      }
+      catch(IOException ioe) {
+        LOG.warn(StringUtils.stringifyException(ioe));
+        return false;
+      }
+    }
+    return true;
   }
 
+  /** An exception class for duplicated source files. */
+  public static class DuplicationException extends IOException {
+    private static final long serialVersionUID = 1L;
+    /** Error code for this exception */
+    public static final int ERROR_CODE = -2;
+    DuplicationException(String message) {super(message);}
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Tue Feb 12 14:01:01
2008
@@ -35,6 +35,8 @@
  */
 public class TestCopyFiles extends TestCase {
   
+  static final URI LOCAL_FS = URI.create("file:///");
+  
   private static final int NFILES = 20;
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
@@ -84,53 +86,33 @@
     long getSeed() { return seed; }
   }
 
-  public TestCopyFiles(String testName) {
-    super(testName);
-  }
-
-  
-  
-  @Override
-  protected void setUp() throws Exception {
-  }
-
-  @Override
-  protected void tearDown() throws Exception {
-  }
-  
   /** create NFILES with random names and directory hierarchies
    * with random (but reproducible) data in them.
    */
-  private static MyFile[] createFiles(String fsname, String topdir)
+  private static MyFile[] createFiles(URI fsname, String topdir)
     throws IOException {
-    MyFile[] files = new MyFile[NFILES];
-    
-    for (int idx = 0; idx < NFILES; idx++) {
-      files[idx] = new MyFile();
-    }
-    
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    FileSystem fs = FileSystem.get(fsname, new Configuration());
     Path root = new Path(topdir);
-    
-    for (int idx = 0; idx < NFILES; idx++) {
-      Path fPath = new Path(root, files[idx].getName());
-      if (!fs.mkdirs(fPath.getParent())) {
-        throw new IOException("Mkdirs failed to create " + 
-                              fPath.getParent().toString());
-      }
-      FSDataOutputStream out = fs.create(fPath);
-      byte[] toWrite = new byte[files[idx].getSize()];
-      Random rb = new Random(files[idx].getSeed());
-      rb.nextBytes(toWrite);
-      out.write(toWrite);
-      out.close();
-      toWrite = null;
+
+    MyFile[] files = new MyFile[NFILES];
+    for (int i = 0; i < NFILES; i++) {
+      files[i] = createFile(root, fs);
     }
-    
     return files;
   }
 
+  static MyFile createFile(Path root, FileSystem fs) throws IOException {
+    MyFile f = new MyFile();
+    Path p = new Path(root, f.getName());
+    FSDataOutputStream out = fs.create(p);
+    byte[] toWrite = new byte[f.getSize()];
+    new Random(f.getSeed()).nextBytes(toWrite);
+    out.write(toWrite);
+    out.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
+    return f;
+  }
+
   /** check if the files have been copied correctly. */
   private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
     throws IOException {
@@ -139,7 +121,7 @@
     FileSystem fs = FileSystem.getNamed(fsname, conf);
     Path root = new Path(topdir);
     
-    for (int idx = 0; idx < NFILES; idx++) {
+    for (int idx = 0; idx < files.length; idx++) {
       Path fPath = new Path(root, files[idx].getName());
       FSDataInputStream in = fs.open(fPath);
       byte[] toRead = new byte[files[idx].getSize()];
@@ -231,7 +213,7 @@
   
   /** copy files from local file system to local file system */
   public void testCopyFromLocalToLocal() throws Exception {
-    MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+    MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
     ToolRunner.run(new CopyFiles(new Configuration()),
                            new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
                                          "file:///"+TEST_ROOT_DIR+"/destdat"});
@@ -250,7 +232,7 @@
       cluster = new MiniDFSCluster(conf, 2, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
-        MyFile[] files = createFiles(namenode, "/srcdat");
+        MyFile[] files = createFiles(URI.create("hdfs://"+namenode), "/srcdat");
         ToolRunner.run(new CopyFiles(conf), new String[] {
                                          "-log",
                                          "hdfs://"+namenode+"/logs",
@@ -279,7 +261,7 @@
       cluster = new MiniDFSCluster(conf, 1, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
-        MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+        MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
         ToolRunner.run(new CopyFiles(conf), new String[] {
                                          "-log",
                                          "hdfs://"+namenode+"/logs",
@@ -308,7 +290,7 @@
       cluster = new MiniDFSCluster(conf, 1, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
-        MyFile[] files = createFiles(namenode, "/srcdat");
+        MyFile[] files = createFiles(URI.create("hdfs://"+namenode), "/srcdat");
         ToolRunner.run(new CopyFiles(conf), new String[] {
                                          "-log",
                                          "/logs",
@@ -336,7 +318,7 @@
       cluster = new MiniDFSCluster(conf, 2, true, null);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
-        MyFile[] files = createFiles(namenode, "/srcdat");
+        MyFile[] files = createFiles(URI.create("hdfs://"+namenode), "/srcdat");
         ToolRunner.run(new CopyFiles(conf), new String[] {
                                          "-p",
                                          "-log",
@@ -388,4 +370,54 @@
     }
   }
 
+  public void testCopyDuplication() throws Exception {
+    try {    
+      MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+      ToolRunner.run(new CopyFiles(new Configuration()),
+          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
+      assertTrue("Source and destination directories do not match.",
+                 checkFiles("local", TEST_ROOT_DIR+"/src2/srcdat", files));
+  
+      assertEquals(CopyFiles.DuplicationException.ERROR_CODE,
+          ToolRunner.run(new CopyFiles(new Configuration()),
+          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
+                        "file:///"+TEST_ROOT_DIR+"/destdat",}));
+    }
+    finally {
+      deldir("local", TEST_ROOT_DIR+"/destdat");
+      deldir("local", TEST_ROOT_DIR+"/srcdat");
+      deldir("local", TEST_ROOT_DIR+"/src2");
+    }
+  }
+
+  public void testCopySingleFile() throws Exception {
+    FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
+    Path root = new Path(TEST_ROOT_DIR+"/srcdat");
+    try {    
+      MyFile[] files = {createFile(root, fs)};
+      //copy a dir with a single file
+      ToolRunner.run(new CopyFiles(new Configuration()),
+          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+                        "file:///"+TEST_ROOT_DIR+"/destdat"});
+      assertTrue("Source and destination directories do not match.",
+                 checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+      
+      //copy a single file
+      String fname = files[0].getName();
+      Path p = new Path(root, fname);
+      FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
+      ToolRunner.run(new CopyFiles(new Configuration()),
+          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
+                        "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
+      assertTrue("Source and destination directories do not match.",
+          checkFiles("local", TEST_ROOT_DIR+"/dest2", files));     
+    }
+    finally {
+      deldir("local", TEST_ROOT_DIR+"/destdat");
+      deldir("local", TEST_ROOT_DIR+"/dest2");
+      deldir("local", TEST_ROOT_DIR+"/srcdat");
+    }
+  }
 }



Mime
View raw message