hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r816476 - in /hadoop/mapreduce/trunk: CHANGES.txt src/tools/org/apache/hadoop/tools/DistCp.java
Date Fri, 18 Sep 2009 06:08:05 GMT
Author: cdouglas
Date: Fri Sep 18 06:08:04 2009
New Revision: 816476

URL: http://svn.apache.org/viewvc?rev=816476&view=rev
Log:
MAPREDUCE-649. Validate a copy by comparing the source and destination
checksums in distcp. Also adds an intra-task retry mechanism for errors
detected during the copy. Contributed by Ravi Gummadi

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816476&r1=816475&r2=816476&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 06:08:04 2009
@@ -361,6 +361,10 @@
     MAPREDUCE-916. Split the documentation to match the project split.
     (Corinne Chandel via omalley)
 
+    MAPREDUCE-649. Validate a copy by comparing the source and destination
+    checksums in distcp. Also adds an intra-task retry mechanism for errors
+    detected during the copy. (Ravi Gummadi via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=816476&r1=816475&r2=816476&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Fri Sep 18 06:08:04
2009
@@ -121,6 +121,7 @@
   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
   private static final int MAX_MAPS_PER_NODE = 20;
   private static final int SYNC_FILE_MAX = 10;
+  private static final int DEFAULT_FILE_RETRIES = 3;
 
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
   static enum Options {
@@ -194,6 +195,7 @@
   static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
   static final String PRESERVE_STATUS_LABEL
       = Options.PRESERVE_STATUS.propertyname + ".value";
+  static final String FILE_RETRIES_LABEL = NAME + ".file.retries";
 
   private JobConf conf;
 
@@ -369,10 +371,94 @@
     }
 
     /**
+     * Validates copy by checking the sizes of files first and then
+     * checksums, if the filesystems support checksums.
+     * @param srcstat src path and metadata
+     * @param absdst dst path
+     * @return true if src & destination files are same
+     */
+    private boolean validateCopy(FileStatus srcstat, Path absdst)
+            throws IOException {
+      if (destFileSys.exists(absdst)) {
+        if (sameFile(srcstat.getPath().getFileSystem(job), srcstat,
+            destFileSys, absdst)) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    /**
+     * Increment number of files copied and bytes copied and then report status
+     */
+    void updateCopyStatus(FileStatus srcstat, Reporter reporter) {
+      copycount++;
+      reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen());
+      reporter.incrCounter(Counter.COPY, 1);
+      updateStatus(reporter);
+    }
+    
+    /**
+     * Skip copying this file if already exists at the destination.
+     * Updates counters and copy status if skipping this file.
+     * @return true    if copy of this file can be skipped
+     */
+    private boolean skipCopyFile(FileStatus srcstat, Path absdst,
+                            OutputCollector<WritableComparable<?>, Text> outc,
+                            Reporter reporter) throws IOException {
+      if (destFileSys.exists(absdst) && !overwrite
+          && !needsUpdate(srcstat, destFileSys, absdst)) {
+        outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
+        ++skipcount;
+        reporter.incrCounter(Counter.SKIP, 1);
+        updateStatus(reporter);
+        return true;
+      }
+      return false;
+    }
+    
+    /**
+     * Copies single file to the path specified by tmpfile.
+     * @param srcstat  src path and metadata
+     * @param tmpfile  temporary file to which copy is to be done
+     * @param absdst   actual destination path to which copy is to be done
+     * @param reporter
+     * @return Number of bytes copied
+     */
+    private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
+                            Reporter reporter) throws IOException {
+      FSDataInputStream in = null;
+      FSDataOutputStream out = null;
+      long bytesCopied = 0L;
+      try {
+        // open src file
+        in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
+        reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
+        // open tmp file
+        out = create(tmpfile, reporter, srcstat);
+        // copy file
+        for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
+          out.write(buffer, 0, bytesRead);
+          bytesCopied += bytesRead;
+          reporter.setStatus(
+              String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+              + absdst + " [ " +
+              StringUtils.humanReadableInt(bytesCopied) + " / " +
+              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
+        }
+      } finally {
+        checkAndClose(in);
+        checkAndClose(out);
+      }
+      return bytesCopied;
+    }
+    
+    /**
      * Copy a file to a destination.
      * @param srcstat src path and metadata
      * @param dstpath dst path
      * @param reporter
+     * @throws IOException if copy fails(even if the validation of copy fails)
      */
     private void copy(FileStatus srcstat, Path relativedst,
         OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
@@ -408,43 +494,18 @@
         return;
       }
 
-      if (destFileSys.exists(absdst) && !overwrite
-          && !needsUpdate(srcstat, destFileSys, absdst)) {
-        outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
-        ++skipcount;
-        reporter.incrCounter(Counter.SKIP, 1);
-        updateStatus(reporter);
+      // Can we skip copying this file ?
+      if (skipCopyFile(srcstat, absdst, outc, reporter)) {
         return;
       }
 
       Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
-      long cbcopied = 0L;
-      FSDataInputStream in = null;
-      FSDataOutputStream out = null;
-      try {
-        // open src file
-        in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
-        reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
-        // open tmp file
-        out = create(tmpfile, reporter, srcstat);
-        // copy file
-        for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
-          out.write(buffer, 0, cbread);
-          cbcopied += cbread;
-          reporter.setStatus(
-              String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
-              + absdst + " [ " +
-              StringUtils.humanReadableInt(cbcopied) + " / " +
-              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
-        }
-      } finally {
-        checkAndClose(in);
-        checkAndClose(out);
-      }
+      // do the actual copy to tmpfile
+      long bytesCopied = doCopyFile(srcstat, tmpfile, absdst, reporter);
 
-      if (cbcopied != srcstat.getLen()) {
+      if (bytesCopied != srcstat.getLen()) {
         throw new IOException("File size not matched: copied "
-            + bytesString(cbcopied) + " to tmpfile (=" + tmpfile
+            + bytesString(bytesCopied) + " to tmpfile (=" + tmpfile
             + ") but expected " + bytesString(srcstat.getLen()) 
             + " from " + srcstat.getPath());        
       }
@@ -458,22 +519,16 @@
         }
         rename(tmpfile, absdst);
 
-        FileStatus dststat = destFileSys.getFileStatus(absdst);
-        if (dststat.getLen() != srcstat.getLen()) {
+        if (!validateCopy(srcstat, absdst)) {
           destFileSys.delete(absdst, false);
-          throw new IOException("File size not matched: copied "
-              + bytesString(dststat.getLen()) + " to dst (=" + absdst
-              + ") but expected " + bytesString(srcstat.getLen()) 
-              + " from " + srcstat.getPath());        
+          throw new IOException("Validation of copy of file "
+              + srcstat.getPath() + " failed.");
         } 
-        updateDestStatus(srcstat, dststat);
+        updateDestStatus(srcstat, destFileSys.getFileStatus(absdst));
       }
 
       // report at least once for each file
-      ++copycount;
-      reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
-      reporter.incrCounter(Counter.COPY, 1);
-      updateStatus(reporter);
+      updateCopyStatus(srcstat, reporter);
     }
     
     /** rename tmp to dst, delete dst if already exists */
@@ -503,6 +558,41 @@
       return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
     }
 
+    /**
+     * Copies a file and validates the copy by checking the checksums.
+     * If validation fails, retries (max number of tries is distcp.file.retries)
+     * to copy the file.
+     */
+    void copyWithRetries(FileStatus srcstat, Path relativedst,
+                         OutputCollector<WritableComparable<?>, Text> out,
+                         Reporter reporter) throws IOException {
+
+      // max tries to copy when validation of copy fails
+      final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES);
+      // save update flag for later copies within the same map task
+      final boolean saveUpdate = update;
+      
+      int retryCnt = 1;
+      for (; retryCnt <= maxRetries; retryCnt++) {
+        try {
+          //copy the file and validate copy
+          copy(srcstat, relativedst, out, reporter);
+          break;// copy successful
+        } catch (IOException e) {
+          LOG.warn("Copy of " + srcstat.getPath() + " failed.", e);
+          if (retryCnt < maxRetries) {// copy failed and need to retry
+            LOG.info("Retrying copy of file " + srcstat.getPath());
+            update = true; // set update flag for retries
+          }
+          else {// no more retries... Give up
+            update = saveUpdate;
+            throw new IOException("Copy of file failed even with " + retryCnt
+                                  + " tries.", e);
+          }
+        }
+      }
+    }
+    
     /** Mapper configuration.
      * Extracts source and destination file system, as well as
      * top-level paths on source and destination directories.
@@ -541,7 +631,7 @@
       final FileStatus srcstat = value.input;
       final Path relativedst = new Path(value.output);
       try {
-        copy(srcstat, relativedst, out, reporter);
+        copyWithRetries(srcstat, relativedst, out, reporter);
       } catch (IOException e) {
         ++failcount;
         reporter.incrCounter(Counter.FAIL, 1);



Mime
View raw message