hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077536 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
Date Fri, 04 Mar 2011 04:25:56 GMT
Author: omalley
Date: Fri Mar  4 04:25:56 2011
New Revision: 1077536

URL: http://svn.apache.org/viewvc?rev=1077536&view=rev
Log:
commit 9bb06f8a8c9e76b30520aae226d9c793ee29ae73
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Thu Jul 8 11:39:37 2010 -0700

    MAPREDUCE-1921. Ensure exceptions during reading of input data in map tasks are augmented
by information about actual input file which caused the exception. Contributed by Krishna
Ramachandran.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1921. Ensure exceptions during reading of input data in map
    +    tasks are augmented by information about actual input file which caused
    +    the exception. (Krishna Ramachandran via acmurthy)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077536&r1=1077535&r2=1077536&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
Fri Mar  4 04:25:56 2011
@@ -160,16 +160,20 @@ class MapTask extends Task {
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
+    private InputSplit split;
     private TaskReporter reporter;
     private long beforePos = -1;
     private long afterPos = -1;
-    
-    TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
+
+    TrackedRecordReader(InputSplit split, JobConf job, RecordReader<K,V> raw, 
+        TaskReporter reporter) 
       throws IOException{
       rawIn = raw;
       inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
       inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
       this.reporter = reporter;
+      this.split = split;
+      conf = job;
     }
 
     public K createKey() {
@@ -196,13 +200,22 @@ class MapTask extends Task {
      
     protected synchronized boolean moveToNext(K key, V value)
       throws IOException {
-      reporter.setProgress(getProgress());
-      beforePos = getPos();
-      boolean ret = rawIn.next(key, value);
-      afterPos = getPos();
+      boolean ret = false;
+      try {
+        reporter.setProgress(getProgress());
+        beforePos = getPos();
+        ret = rawIn.next(key, value);
+        afterPos = getPos();
+      } catch (IOException ioe) {
+        if (split instanceof FileSplit) {
+          LOG.error("IO error in map input file " + conf.get("map.input.file"));
+          throw new IOException("IO error in map input file "
+              + conf.get("map.input.file"), ioe);
+        }
+      }
       return ret;
     }
-    
+
     public long getPos() throws IOException { return rawIn.getPos(); }
     public void close() throws IOException { rawIn.close(); }
     public float getProgress() throws IOException {
@@ -227,7 +240,7 @@ class MapTask extends Task {
     
     SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
                          TaskReporter reporter) throws IOException{
-      super(raw, reporter);
+      super(null, conf, raw, reporter);
       this.umbilical = umbilical;
       this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
@@ -363,7 +376,7 @@ class MapTask extends Task {
       job.getInputFormat().getRecordReader(inputSplit, job, reporter);
     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
         new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
-        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
+        new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, rawIn, reporter);
     job.setBoolean("mapred.skip.on", isSkipping());
 
 
@@ -407,11 +420,14 @@ class MapTask extends Task {
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
     private final TaskReporter reporter;
+    private org.apache.hadoop.mapreduce.InputSplit inputSplit;
     
-    NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
-                            TaskReporter reporter) {
+    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
+        org.apache.hadoop.mapreduce.RecordReader<K,V> real,
+        TaskReporter reporter) {
       this.real = real;
       this.reporter = reporter;
+      this.inputSplit = split;
       this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
     }
 
@@ -444,11 +460,22 @@ class MapTask extends Task {
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-      boolean result = real.nextKeyValue();
-      if (result) {
-        inputRecordCounter.increment(1);
+      boolean result = false;
+      try {
+        result = real.nextKeyValue();
+        if (result) {
+          inputRecordCounter.increment(1);
+        }
+        reporter.setProgress(getProgress());
+      } catch (IOException ioe) {
+        if (inputSplit instanceof FileSplit) {
+          FileSplit fileSplit = (FileSplit) inputSplit;
+          LOG.error("IO error in map input file "
+              + fileSplit.getPath().toString());
+          throw new IOException("IO error in map input file "
+              + fileSplit.getPath().toString(), ioe);
+        }
       }
-      reporter.setProgress(getProgress());
       return result;
     }
   }
@@ -604,7 +631,8 @@ class MapTask extends Task {
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
-          (inputFormat.createRecordReader(split, taskContext), reporter);
+          (split, inputFormat.createRecordReader(split, taskContext), 
+              reporter);
     
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;



Mime
View raw message