hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1379654 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: CHANGES.txt.MR-3902 hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Date Fri, 31 Aug 2012 22:41:46 GMT
Author: sseth
Date: Fri Aug 31 22:41:45 2012
New Revision: 1379654

URL: http://svn.apache.org/viewvc?rev=1379654&view=rev
Log:
MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437 (sseth)

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379654&r1=1379653&r2=1379654&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 31
22:41:45 2012
@@ -8,3 +8,5 @@ Branch MR-3902
   MAPREDUCE-4609. RMContainerAllocator scheduler interval should be configurable. (Tsuyoshi
OZAWA via sseth)
 
   MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth)
+
+  MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437. (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379654&r1=1379653&r2=1379654&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Fri Aug 31 22:41:45 2012
@@ -123,6 +123,9 @@ public class RMContainerAllocator extend
   Timer scheduleTimer;
   ScheduleTimerTask scheduleTimerTask;
   private long lastScheduleTime = 0l;
+  private int lastCompletedTasks = 0;
+  private int completedMaps = 0;
+  private int completedReduces = 0;
   
   /*
   Vocabulary Used: 
@@ -287,13 +290,9 @@ public class RMContainerAllocator extend
 
     @Override
     public void run() {
-      // TODO XXX XXX: Reduces are not being shceduled. Forcing them via this for now. Figure
out when reduce schedule should be recomputed.
-      // TODO XXX. Does this need to be stopped before the service stop()
       if (clock.getTime() - lastScheduleTime > scheduleInterval && shouldRun)
{
         handle(new AMSchedulerEventContainersAllocated(
-            Collections.<ContainerId> emptyList(), true));
-        // Sending a false. Just try to flush available containers.
-        // The decision to schedule reduces may need to be based on available containers.
+            Collections.<ContainerId> emptyList(), false));
       }
     }
 
@@ -330,13 +329,12 @@ public class RMContainerAllocator extend
   }
 
   protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
-
+    // Recalculating reduce schedule here since it's required for most events.
+    recalculateReduceSchedule = true;
     LOG.info("XXX: Processing the event " + sEvent.toString());
     switch(sEvent.getType()) {
-    // TODO XXX: recalculateReduceSchedule may need to bet set on other events - not just
containerAllocated.
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
-      // Add to queue of pending tasks.
       break;
     case S_TA_STOP_REQUEST: //Effectively means a failure.
       handleTaStopRequest((AMSchedulerTAStopRequestEvent)sEvent);
@@ -413,11 +411,6 @@ public class RMContainerAllocator extend
   }
   
   private void handleTaSucceededRequest(AMSchedulerTASucceededEvent event) {
-    // TODO XXX Part of re-use.
-    // TODO XXX Also may change after state machines are finalized.
-    // XXX: Maybe send the request to the task before sending it to the scheduler - the scheduler
can then
-    //  query the task to figure out whether the taskAttempt is the successfulAttempt - and
whether to count it towards the reduce ramp up.
-    //  Otherwise -> Job.getCompletedMaps() - will give an out of date picture, since
the scheduler event will always be generated before the TaskCompleted event to the job.
     attemptToLaunchRequestMap.remove(event.getAttemptID());
     ContainerId containerId = assignedRequests.remove(event.getAttemptID());
     if (containerId != null) { // TODO Should not be null. Confirm.
@@ -435,15 +428,20 @@ public class RMContainerAllocator extend
 
   // TODO XXX: Deal with node blacklisting.
   
-  private void handleContainersAllocated(AMSchedulerEventContainersAllocated event) {
-
+  private void handleContainersAllocated(
+      AMSchedulerEventContainersAllocated event) {
     availableContainerIds.addAll(event.getContainerIds());
-  
+
+    completedMaps = getJob().getCompletedMaps();
+    completedReduces = getJob().getCompletedReduces();
+    int completedTasks = completedMaps + completedReduces;
+
+    if (lastCompletedTasks != completedTasks) {
+      recalculateReduceSchedule = true;
+      lastCompletedTasks = completedTasks;
+    }
 
     if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
-      // TODO XXX -> recaulculateReduceSchedule in case of released containers
-      // .... would imply CONTAINER_COMPLETED messages are required by the Scheduler.
-      // ContainerReleased == headroomChange ?
       recalculateReduceSchedule = true;
     }
     schedule();
@@ -697,8 +695,8 @@ public class RMContainerAllocator extend
         " ScheduledReduces:" + scheduledRequests.reduces.size() +
         " AssignedMaps:" + assignedRequests.maps.size() + 
         " AssignedReduces:" + assignedRequests.reduces.size() +
-        " completedMaps:" + getJob().getCompletedMaps() + 
-        " completedReduces:" + getJob().getCompletedReduces() +
+        " completedMaps:" + completedMaps + 
+        " completedReduces:" + completedReduces +
         " containersAllocated:" + containersAllocated + //Not super useful.
         " newContainersAllocated: " + newContainerAllocations +
         " existingContainersAllocated: " + existingContainerAllocations +



Mime
View raw message