hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1401943 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/...
Date Thu, 25 Oct 2012 01:27:43 GMT
Author: vinodkv
Date: Thu Oct 25 01:27:43 2012
New Revision: 1401943

URL: http://svn.apache.org/viewvc?rev=1401943&view=rev
Log:
MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion requests slowly to
avoid HADOOP-8942. Contributed by Jason Lowe.
svn merge --ignore-ancestry -c 1401941 ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
      - copied unchanged from r1401941, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1401943&r1=1401942&r2=1401943&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Oct 25 01:27:43
2012
@@ -48,6 +48,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
     (Vinod Kumar Vavilapalli via jlowe)
 
+    MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
+    requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=1401943&r1=1401942&r2=1401943&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
Thu Oct 25 01:27:43 2012
@@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskComp
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-@SuppressWarnings("deprecation")
 class EventFetcher<K,V> extends Thread {
   private static final long SLEEP_TIME = 1000;
-  private static final int MAX_EVENTS_TO_FETCH = 10000;
   private static final int MAX_RETRIES = 10;
   private static final int RETRY_PERIOD = 5000;
   private static final Log LOG = LogFactory.getLog(EventFetcher.class);
@@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread {
   private final TaskAttemptID reduce;
   private final TaskUmbilicalProtocol umbilical;
   private final ShuffleScheduler<K,V> scheduler;
-  private int fromEventId = 0;
+  private int fromEventIdx = 0;
+  private int maxEventsToFetch;
   private ExceptionReporter exceptionReporter = null;
   
   private int maxMapRuntime = 0;
@@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread {
   public EventFetcher(TaskAttemptID reduce,
                       TaskUmbilicalProtocol umbilical,
                       ShuffleScheduler<K,V> scheduler,
-                      ExceptionReporter reporter) {
+                      ExceptionReporter reporter,
+                      int maxEventsToFetch) {
     setName("EventFetcher for fetching Map Completion Events");
     setDaemon(true);    
     this.reduce = reduce;
     this.umbilical = umbilical;
     this.scheduler = scheduler;
     exceptionReporter = reporter;
+    this.maxEventsToFetch = maxEventsToFetch;
   }
 
   @Override
@@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread {
    * from a given event ID.
    * @throws IOException
    */  
-  private int getMapCompletionEvents() throws IOException {
+  protected int getMapCompletionEvents() throws IOException {
     
     int numNewMaps = 0;
-    
-    MapTaskCompletionEventsUpdate update = 
-      umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID)
-                                       reduce.getJobID(), 
-                                       fromEventId, 
-                                       MAX_EVENTS_TO_FETCH,
-                                       (org.apache.hadoop.mapred.TaskAttemptID)
-                                         reduce);
-    TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
-    LOG.debug("Got " + events.length + " map completion events from " + 
-             fromEventId);
-      
-    // Check if the reset is required.
-    // Since there is no ordering of the task completion events at the 
-    // reducer, the only option to sync with the new jobtracker is to reset 
-    // the events index
-    if (update.shouldReset()) {
-      fromEventId = 0;
-      scheduler.resetKnownMaps();
-    }
-    
-    // Update the last seen event ID
-    fromEventId += events.length;
-    
-    // Process the TaskCompletionEvents:
-    // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
-    // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop 
-    //    fetching from those maps.
-    // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
-    //    outputs at all.
-    for (TaskCompletionEvent event : events) {
-      switch (event.getTaskStatus()) {
+    TaskCompletionEvent events[] = null;
+
+    do {
+      MapTaskCompletionEventsUpdate update =
+          umbilical.getMapCompletionEvents(
+              (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
+              fromEventIdx,
+              maxEventsToFetch,
+              (org.apache.hadoop.mapred.TaskAttemptID)reduce);
+      events = update.getMapTaskCompletionEvents();
+      LOG.debug("Got " + events.length + " map completion events from " +
+               fromEventIdx);
+
+      // Check if the reset is required.
+      // Since there is no ordering of the task completion events at the
+      // reducer, the only option to sync with the new jobtracker is to reset
+      // the events index
+      if (update.shouldReset()) {
+        fromEventIdx = 0;
+        scheduler.resetKnownMaps();
+      }
+
+      // Update the last seen event ID
+      fromEventIdx += events.length;
+
+      // Process the TaskCompletionEvents:
+      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+      //    fetching from those maps.
+      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+      //    outputs at all.
+      for (TaskCompletionEvent event : events) {
+        switch (event.getTaskStatus()) {
         case SUCCEEDED:
           URI u = getBaseURI(event.getTaskTrackerHttp());
           scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
-                                      u.toString(),
-                                      event.getTaskAttemptId());
+              u.toString(),
+              event.getTaskAttemptId());
           numNewMaps ++;
           int duration = event.getTaskRunTime();
           if (duration > maxMapRuntime) {
@@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread {
         case OBSOLETE:
           scheduler.obsoleteMapOutput(event.getTaskAttemptId());
           LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
-                   " map-task: '" + event.getTaskAttemptId() + "'");
+              " map-task: '" + event.getTaskAttemptId() + "'");
           break;
         case TIPFAILED:
           scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
           LOG.info("Ignoring output of failed map TIP: '" +  
-               event.getTaskAttemptId() + "'");
+              event.getTaskAttemptId() + "'");
           break;
+        }
       }
-    }
+    } while (events.length == maxEventsToFetch);
+
     return numNewMaps;
   }
   

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1401943&r1=1401942&r2=1401943&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
Thu Oct 25 01:27:43 2012
@@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class Shuffle<K, V> implements ExceptionReporter {
   private static final int PROGRESS_FREQUENCY = 2000;
+  private static final int MAX_EVENTS_TO_FETCH = 10000;
+  private static final int MIN_EVENTS_TO_FETCH = 100;
+  private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
   
   private final TaskAttemptID reduceId;
   private final JobConf jobConf;
@@ -99,9 +102,17 @@ public class Shuffle<K, V> implements Ex
   }
 
   public RawKeyValueIterator run() throws IOException, InterruptedException {
+    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+    // on the ApplicationMaster when a thundering herd of reducers fetch events
+    // TODO: This should not be necessary after HADOOP-8942
+    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
+        MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
+    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+
     // Start the map-completion events fetcher thread
     final EventFetcher<K,V> eventFetcher = 
-      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this);
+      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
+          maxEventsToFetch);
     eventFetcher.start();
     
     // Start the map-output fetcher threads



Mime
View raw message