Author: cdouglas
Date: Fri Sep 18 06:08:04 2009
New Revision: 816476
URL: http://svn.apache.org/viewvc?rev=816476&view=rev
Log:
MAPREDUCE-649. Validate a copy by comparing the source and destination
checksums in distcp. Also adds an intra-task retry mechanism for errors
detected during the copy. Contributed by Ravi Gummadi
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816476&r1=816475&r2=816476&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 06:08:04 2009
@@ -361,6 +361,10 @@
MAPREDUCE-916. Split the documentation to match the project split.
(Corinne Chandel via omalley)
+ MAPREDUCE-649. Validate a copy by comparing the source and destination
+ checksums in distcp. Also adds an intra-task retry mechanism for errors
+ detected during the copy. (Ravi Gummadi via cdouglas)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=816476&r1=816475&r2=816476&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Fri Sep 18 06:08:04
2009
@@ -121,6 +121,7 @@
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
private static final int MAX_MAPS_PER_NODE = 20;
private static final int SYNC_FILE_MAX = 10;
+ private static final int DEFAULT_FILE_RETRIES = 3;
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
static enum Options {
@@ -194,6 +195,7 @@
static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
static final String PRESERVE_STATUS_LABEL
= Options.PRESERVE_STATUS.propertyname + ".value";
+ static final String FILE_RETRIES_LABEL = NAME + ".file.retries";
private JobConf conf;
@@ -369,10 +371,94 @@
}
/**
+ * Validates copy by checking the sizes of files first and then
+ * checksums, if the filesystems support checksums.
+ * @param srcstat src path and metadata
+ * @param absdst dst path
+ * @return true if src & destination files are same
+ */
+ private boolean validateCopy(FileStatus srcstat, Path absdst)
+ throws IOException {
+ if (destFileSys.exists(absdst)) {
+ if (sameFile(srcstat.getPath().getFileSystem(job), srcstat,
+ destFileSys, absdst)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Increment number of files copied and bytes copied and then report status
+ */
+ void updateCopyStatus(FileStatus srcstat, Reporter reporter) {
+ copycount++;
+ reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen());
+ reporter.incrCounter(Counter.COPY, 1);
+ updateStatus(reporter);
+ }
+
+ /**
+ * Skip copying this file if already exists at the destination.
+ * Updates counters and copy status if skipping this file.
+ * @return true if copy of this file can be skipped
+ */
+ private boolean skipCopyFile(FileStatus srcstat, Path absdst,
+ OutputCollector<WritableComparable<?>, Text> outc,
+ Reporter reporter) throws IOException {
+ if (destFileSys.exists(absdst) && !overwrite
+ && !needsUpdate(srcstat, destFileSys, absdst)) {
+ outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
+ ++skipcount;
+ reporter.incrCounter(Counter.SKIP, 1);
+ updateStatus(reporter);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Copies single file to the path specified by tmpfile.
+ * @param srcstat src path and metadata
+ * @param tmpfile temporary file to which copy is to be done
+ * @param absdst actual destination path to which copy is to be done
+ * @param reporter
+ * @return Number of bytes copied
+ */
+ private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
+ Reporter reporter) throws IOException {
+ FSDataInputStream in = null;
+ FSDataOutputStream out = null;
+ long bytesCopied = 0L;
+ try {
+ // open src file
+ in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
+ reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
+ // open tmp file
+ out = create(tmpfile, reporter, srcstat);
+ // copy file
+ for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
+ out.write(buffer, 0, bytesRead);
+ bytesCopied += bytesRead;
+ reporter.setStatus(
+ String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+ + absdst + " [ " +
+ StringUtils.humanReadableInt(bytesCopied) + " / " +
+ StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
+ }
+ } finally {
+ checkAndClose(in);
+ checkAndClose(out);
+ }
+ return bytesCopied;
+ }
+
+ /**
* Copy a file to a destination.
* @param srcstat src path and metadata
* @param dstpath dst path
* @param reporter
+ * @throws IOException if copy fails(even if the validation of copy fails)
*/
private void copy(FileStatus srcstat, Path relativedst,
OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
@@ -408,43 +494,18 @@
return;
}
- if (destFileSys.exists(absdst) && !overwrite
- && !needsUpdate(srcstat, destFileSys, absdst)) {
- outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
- ++skipcount;
- reporter.incrCounter(Counter.SKIP, 1);
- updateStatus(reporter);
+ // Can we skip copying this file ?
+ if (skipCopyFile(srcstat, absdst, outc, reporter)) {
return;
}
Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
- long cbcopied = 0L;
- FSDataInputStream in = null;
- FSDataOutputStream out = null;
- try {
- // open src file
- in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
- reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
- // open tmp file
- out = create(tmpfile, reporter, srcstat);
- // copy file
- for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
- out.write(buffer, 0, cbread);
- cbcopied += cbread;
- reporter.setStatus(
- String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
- + absdst + " [ " +
- StringUtils.humanReadableInt(cbcopied) + " / " +
- StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
- }
- } finally {
- checkAndClose(in);
- checkAndClose(out);
- }
+ // do the actual copy to tmpfile
+ long bytesCopied = doCopyFile(srcstat, tmpfile, absdst, reporter);
- if (cbcopied != srcstat.getLen()) {
+ if (bytesCopied != srcstat.getLen()) {
throw new IOException("File size not matched: copied "
- + bytesString(cbcopied) + " to tmpfile (=" + tmpfile
+ + bytesString(bytesCopied) + " to tmpfile (=" + tmpfile
+ ") but expected " + bytesString(srcstat.getLen())
+ " from " + srcstat.getPath());
}
@@ -458,22 +519,16 @@
}
rename(tmpfile, absdst);
- FileStatus dststat = destFileSys.getFileStatus(absdst);
- if (dststat.getLen() != srcstat.getLen()) {
+ if (!validateCopy(srcstat, absdst)) {
destFileSys.delete(absdst, false);
- throw new IOException("File size not matched: copied "
- + bytesString(dststat.getLen()) + " to dst (=" + absdst
- + ") but expected " + bytesString(srcstat.getLen())
- + " from " + srcstat.getPath());
+ throw new IOException("Validation of copy of file "
+ + srcstat.getPath() + " failed.");
}
- updateDestStatus(srcstat, dststat);
+ updateDestStatus(srcstat, destFileSys.getFileStatus(absdst));
}
// report at least once for each file
- ++copycount;
- reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
- reporter.incrCounter(Counter.COPY, 1);
- updateStatus(reporter);
+ updateCopyStatus(srcstat, reporter);
}
/** rename tmp to dst, delete dst if already exists */
@@ -503,6 +558,41 @@
return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
}
+ /**
+ * Copies a file and validates the copy by checking the checksums.
+ * If validation fails, retries (max number of tries is distcp.file.retries)
+ * to copy the file.
+ */
+ void copyWithRetries(FileStatus srcstat, Path relativedst,
+ OutputCollector<WritableComparable<?>, Text> out,
+ Reporter reporter) throws IOException {
+
+ // max tries to copy when validation of copy fails
+ final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES);
+ // save update flag for later copies within the same map task
+ final boolean saveUpdate = update;
+
+ int retryCnt = 1;
+ for (; retryCnt <= maxRetries; retryCnt++) {
+ try {
+ //copy the file and validate copy
+ copy(srcstat, relativedst, out, reporter);
+ break;// copy successful
+ } catch (IOException e) {
+ LOG.warn("Copy of " + srcstat.getPath() + " failed.", e);
+ if (retryCnt < maxRetries) {// copy failed and need to retry
+ LOG.info("Retrying copy of file " + srcstat.getPath());
+ update = true; // set update flag for retries
+ }
+ else {// no more retries... Give up
+ update = saveUpdate;
+ throw new IOException("Copy of file failed even with " + retryCnt
+ + " tries.", e);
+ }
+ }
+ }
+ }
+
/** Mapper configuration.
* Extracts source and destination file system, as well as
* top-level paths on source and destination directories.
@@ -541,7 +631,7 @@
final FileStatus srcstat = value.input;
final Path relativedst = new Path(value.output);
try {
- copy(srcstat, relativedst, out, reporter);
+ copyWithRetries(srcstat, relativedst, out, reporter);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
|