hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1471557 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...
Date Wed, 24 Apr 2013 17:39:29 GMT
Author: acmurthy
Date: Wed Apr 24 17:39:28 2013
New Revision: 1471557

URL: http://svn.apache.org/r1471557
Log:
Merge -c 1471556 from trunk to branch-2 to fix MAPREDUCE-4737. Ensure that mapreduce APIs
are semantically consistent with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the
sense that cleanup is now called even if there is an error. The old mapred API already ensures
that Mapper.close and Reducer.close are invoked during error handling. Note that it is an
incompatible change, however end-users can override Mapper.run and Reducer.run to get the
old (inconsistent) behaviour. Contributed by Arun C. Murthy.

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
      - copied unchanged from r1471556, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1471557&r1=1471556&r2=1471557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 24 17:39:28
2013
@@ -180,6 +180,14 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5146. application classloader may be used too early to load
     classes. (Sangjin Lee via tomwhite)
 
+    MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+    with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that
+    cleanup is now called even if there is an error. The old mapred API
+    already ensures that Mapper.close and Reducer.close are invoked during
+    error handling. Note that it is an incompatible change, however end-users 
+    can override Mapper.run and Reducer.run to get the old (inconsistent) 
+    behaviour. (acmurthy)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
Wed Apr 24 17:39:28 2013
@@ -434,10 +434,15 @@ public class MapTask extends Task {
       }
       statusUpdate(umbilical);
       collector.flush();
-    } finally {
-      //close
-      in.close();                               // close input
+      
+      in.close();
+      in = null;
+      
       collector.close();
+      collector = null;
+    } finally {
+      closeQuietly(in);
+      closeQuietly(collector);
     }
   }
 
@@ -753,13 +758,20 @@ public class MapTask extends Task {
           new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
               mapContext);
 
-    input.initialize(split, mapperContext);
-    mapper.run(mapperContext);
-    mapPhase.complete();
-    setPhase(TaskStatus.Phase.SORT);
-    statusUpdate(umbilical);
-    input.close();
-    output.close(mapperContext);
+    try {
+      input.initialize(split, mapperContext);
+      mapper.run(mapperContext);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
+      input.close();
+      input = null;
+      output.close(mapperContext);
+      output = null;
+    } finally {
+      closeQuietly(input);
+      closeQuietly(output, mapperContext);
+    }
   }
 
   class DirectMapOutputCollector<K, V>
@@ -1949,4 +1961,55 @@ public class MapTask extends Task {
     }
   }
 
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+          mapperContext) {
+    if (c != null) {
+      try {
+        c.close(mapperContext);
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1471557&r1=1471556&r2=1471557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
Wed Apr 24 17:39:28 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -428,14 +429,15 @@ public class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY,
OUTVALUE>(
+    RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
         this, job, reporter, finalName);
-
+    final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
+    
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
-          out.write(key, value);
+          finalOut.write(key, value);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -466,20 +468,14 @@ public class ReduceTask extends Task {
         values.informReduceProgress();
       }
 
-      //Clean up: repeated in catch block below
       reducer.close();
-      out.close(reporter);
-      //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
+      reducer = null;
       
-      throw ioe;
+      out.close(reporter);
+      out = null;
+    } finally {
+      IOUtils.cleanup(LOG, reducer);
+      closeQuietly(out, reporter);
     }
   }
 
@@ -645,7 +641,21 @@ public class ReduceTask extends Task {
                                                committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
+    try {
+      reducer.run(reducerContext);
+    } finally {
+      trackedRW.close(reducerContext);
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+    if (c != null) {
+      try {
+        c.close(r);
+      } catch (Exception e) {
+        LOG.info("Exception in closing " + c, e);
+      }
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1471557&r1=1471556&r2=1471557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
Wed Apr 24 17:39:28 2013
@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKeyValue()) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1471557&r1=1471556&r2=1471557&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
Wed Apr 24 17:39:28 2013
@@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKey()) {
-      reduce(context.getCurrentKey(), context.getValues(), context);
-      // If a back up store is used, reset it
-      Iterator<VALUEIN> iter = context.getValues().iterator();
-      if(iter instanceof ReduceContext.ValueIterator) {
-        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+    try {
+      while (context.nextKey()) {
+        reduce(context.getCurrentKey(), context.getValues(), context);
+        // If a back up store is used, reset it
+        Iterator<VALUEIN> iter = context.getValues().iterator();
+        if(iter instanceof ReduceContext.ValueIterator) {
+          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+        }
       }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }



Mime
View raw message