Author: tucu
Date: Mon Jul 9 19:20:37 2012
New Revision: 1359348
URL: http://svn.apache.org/viewvc?rev=1359348&view=rev
Log:
MAPREDUCE-3993. Graceful handling of codec errors during decompression (kkambatl via tucu)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/IOUtils.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1359348&r1=1359347&r2=1359348&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jul 9 19:20:37 2012
@@ -60,6 +60,9 @@ Release 1.2.0 - unreleased
MAPREDUCE-4359. Potential deadlock in Counters. (tomwhite)
+ MAPREDUCE-3993. Graceful handling of codec errors during decompression
+ (kkambatl via tucu)
+
Release 1.1.0 - unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/IOUtils.java?rev=1359348&r1=1359347&r2=1359348&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/IOUtils.java Mon Jul 9
19:20:37 2012
@@ -138,6 +138,28 @@ public class IOUtils {
}
}
+ /**
+ * Utility wrapper for reading from {@link InputStream}. It catches any errors
+ * thrown by the underlying stream (either IO or decompression-related), and
+ * re-throws as an IOException.
+ *
+ * @param is - InputStream to be read from
+ * @param buf - buffer the data is read into
+ * @param off - offset within buf
+ * @param len - amount of data to be read
+ * @return number of bytes read
+ */
+ public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
+ int off, int len) throws IOException {
+ try {
+ return is.read(buf, off, len);
+ } catch (IOException ie) {
+ throw ie;
+ } catch (Throwable t) {
+ throw new IOException("Error while reading compressed data", t);
+ }
+ }
+
/** Reads len bytes in a loop.
* @param in The InputStream to read from
* @param buf The buffer to fill
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1359348&r1=1359347&r2=1359348&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/IFile.java Mon Jul
9 19:20:37 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -325,7 +326,8 @@ class IFile {
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
- int n = in.read(buf, off+bytesRead, len-bytesRead);
+ int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+ len - bytesRead);
if (n < 0) {
return bytesRead;
}
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1359348&r1=1359347&r2=1359348&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon
Jul 9 19:20:37 2012
@@ -1702,15 +1702,16 @@ class ReduceTask extends Task {
int bytesRead = 0;
try {
- int n = input.read(shuffleData, 0, shuffleData.length);
+ int n = IOUtils.wrappedReadForCompressedData(input, shuffleData, 0,
+ shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
// indicate we're making progress
reporter.progress();
- n = input.read(shuffleData, bytesRead,
- (shuffleData.length-bytesRead));
+ n = IOUtils.wrappedReadForCompressedData(input, shuffleData,
+ bytesRead, shuffleData.length - bytesRead);
}
if (LOG.isDebugEnabled()) {
|