hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1379650 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main...
Date Fri, 31 Aug 2012 22:39:10 GMT
Author: sseth
Date: Fri Aug 31 22:39:10 2012
New Revision: 1379650

URL: http://svn.apache.org/viewvc?rev=1379650&view=rev
Log:
MAPREDUCE-4625. Statistics logging in the AM scheduler.

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 31
22:39:10 2012
@@ -6,3 +6,5 @@ Branch MR-3902
   MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Tsuyoshi OZAWA via sseth)
 
   MAPREDUCE-4609. RMContainerAllocator scheduler interval should be configurable. (Tsuyoshi
OZAWA via sseth)
+
+  MAPREDUCE-4625. Statistics logging in the AM scheduler. (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
Fri Aug 31 22:39:10 2012
@@ -444,6 +444,7 @@ public class MRAppMaster extends Composi
     // second timeout before the exit.
     // TODO XXX: Modify TaskAttemptCleaner to empty it's queue while stopping.
     public void handle(JobFinishEvent event) {
+      LOG.info("Handling JobFinished Event");
       AMShutdownRunnable r = new AMShutdownRunnable();
       Thread t = new Thread(r, "AMShutdownThread");
       t.start();
@@ -506,6 +507,8 @@ public class MRAppMaster extends Composi
       @Override
       public void run() {
         maybeSendJobEndNotification();
+        // TODO XXX Add a timeout.
+        LOG.info("Waiting for all containers and TaskAttempts to complete");
         while (!allContainersComplete() || !allTaskAttemptsComplete()) {
           try {
             synchronized(this) {
@@ -516,6 +519,7 @@ public class MRAppMaster extends Composi
             break;
           }
         }
+        LOG.info("All Containers and TaskAttempts Complete. Stopping services");
         stopAM();
         LOG.info("AM Shutdown Thread Completing");
       }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventType.java
Fri Aug 31 22:39:10 2012
@@ -25,8 +25,7 @@ public enum TaskAttemptEventType {
 
   //Producer:Task, Speculator
   TA_SCHEDULE,
-  TA_RESCHEDULE,
-  
+
   //Producer: TaskAttemptListener
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
Fri Aug 31 22:39:10 2012
@@ -591,7 +591,8 @@ public abstract class TaskImpl implement
 
     ++numberUncompletedAttempts;
     //schedule the nextAttemptNumber
-    eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE,
failedAttempts > 0));
+    eventHandler.handle(new TaskAttemptScheduleEvent(attempt.getID(),
+        TaskAttemptEventType.TA_SCHEDULE, failedAttempts > 0));
     
   }
 

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
Fri Aug 31 22:39:10 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm;
 
 import java.util.List;
@@ -9,6 +27,10 @@ public class AMSchedulerEventContainersA
   private final List<ContainerId> containerIds;
   private final boolean headRoomChanged;
 
+  // TODO XXX: Maybe distinguish between newly allocated containers and 
+  // existing containers being re-used.
+  // headRoomChanged is a strange API - making an assumption about how the
+  // scheduler will use this info.
   public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
       boolean headRoomChanged) {
     super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
Fri Aug 31 22:39:10 2012
@@ -6,21 +6,17 @@ public enum AMSchedulerEventType {
   S_TA_STOP_REQUEST, // Maybe renamed to S_TA_END / S_TA_ABNORMAL_END
   S_TA_SUCCEEDED,
   S_TA_ENDED,
-  
+
   //Producer: RMCommunicator
   S_CONTAINERS_ALLOCATED,
-  
+
   //Producer: Container. (Maybe RMCommunicator)
   S_CONTAINER_COMPLETED,
-  
-  // Add events for nodes being blacklisted.
-  
-  // TODO XXX
-  //Producer: RMCommunicator. May not be needed.
-//  S_CONTAINER_COMPLETED,
-  
-  //Producer: RMComm
-//  S_NODE_UNHEALTHY,
-//  S_NODE_HEALTHY,
-  
+
+  //Producer: Node
+  S_NODE_BLACKLISTED,
+  S_NODE_UNHEALTHY,
+  S_NODE_HEALTHY
+  // The scheduler should have a way of knowing about unusable nodes. Acting on
+  // this information to change requests etc is scheduler specific.
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Fri Aug 31 22:39:10 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
@@ -158,6 +159,8 @@ public class RMContainerAllocator extend
       attemptToLaunchRequestMap = new HashMap<TaskAttemptId, AMSchedulerTALaunchRequestEvent>();
   
   private int containersAllocated = 0;
+  private int newContainerAllocations = 0;
+  private int existingContainerAllocations = 0;
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
@@ -177,7 +180,6 @@ public class RMContainerAllocator extend
   BlockingQueue<AMSchedulerEvent> eventQueue
     = new LinkedBlockingQueue<AMSchedulerEvent>();
 
-  @SuppressWarnings("rawtypes")
   public RMContainerAllocator(RMContainerRequestor requestor,
       AppContext appContext) {
     super("RMContainerAllocator");
@@ -211,7 +213,10 @@ public class RMContainerAllocator extend
         MRJobConfig.MR_AM_SCHEDULER_INTERVAL,
         MRJobConfig.DEFAULT_MR_AM_SCHEDULER_INTERVAL);
     shouldReUse = conf.getBoolean("am.scheduler.shouldReuse", false);
-    LOG.info("XXX: ShouldReUse: " + shouldReUse);
+    LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse
+        + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: "
+        + maxReduceRampupLimit + ", maxReducePreemptionLimit: "
+        + maxReducePreemptionLimit);
     RackResolver.init(conf);
   }
 
@@ -249,7 +254,8 @@ public class RMContainerAllocator extend
     
     scheduleTimer = new Timer("AMSchedulerTimer", true);
     scheduleTimerTask = new ScheduleTimerTask();
-    scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval, scheduleInterval);
+    scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval,
+        scheduleInterval);
     this.job = appContext.getJob(jobId);
     
     super.start();
@@ -260,10 +266,10 @@ public class RMContainerAllocator extend
     this.stopEventHandling = true;
     if (eventHandlingThread != null)
       eventHandlingThread.interrupt();
-    super.stop();
     if (scheduleTimerTask != null) {
       scheduleTimerTask.stop();
     }
+    super.stop();
     LOG.info("Final Scheduler Stats: " + getStat());
   }
   
@@ -322,11 +328,10 @@ public class RMContainerAllocator extend
       throw new YarnException(e);
     }
   }
-  
-  // TODO XXX: Before and after makeRemoteRequest statistics.
 
-protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    
+  protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
+
+    LOG.info("XXX: Processing the event " + sEvent.toString());
     switch(sEvent.getType()) {
     // TODO XXX: recalculateReduceSchedule may need to bet set on other events - not just
containerAllocated.
     case S_TA_LAUNCH_REQUEST:
@@ -345,21 +350,22 @@ protected synchronized void handleEvent(
     case S_CONTAINERS_ALLOCATED:
       handleContainersAllocated((AMSchedulerEventContainersAllocated) sEvent);
       break;
-    // No HEALTH_CHANGE events. Not modifying the table based on these.
-    case S_CONTAINER_COMPLETED: // Maybe use this to reschedule reduces ?
+    case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler.
       break;
-    // Node State Change Event. May want to withdraw requests related to the node, and put
-    // in fresh requests.
-      
-    // Similarly for the case where a  node gets blacklisted.
-    default:
+    case S_NODE_BLACKLISTED:
+      // TODO XXX Withdraw requests related to this node and place new ones.
+      break;
+    case S_NODE_UNHEALTHY:
+      // Ignore. RM will not allocated containers on this node.
+      break;
+    case S_NODE_HEALTHY:
+      // Ignore. RM will start allocating containers if there's pending requests.
       break;
     }
   }
 
   private void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
     // Add to queue of pending tasks.
-    LOG.info("Processing the event " + event.toString());
     attemptToLaunchRequestMap.put(event.getAttemptID(), event);
     if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
       mapResourceReqt = maybeComputeNormalizedRequestForType(event,
@@ -381,10 +387,8 @@ protected synchronized void handleEvent(
   }
 
   private void handleTaStopRequest(AMSchedulerTAStopRequestEvent event) {
-    LOG.info("Processing the event " + event.toString());
     TaskAttemptId aId = event.getAttemptID();
     attemptToLaunchRequestMap.remove(aId);
-    // XXX Not very efficient. List / check type.
     boolean removed = pendingReduces.remove(aId);
     if (!removed) {
       removed = scheduledRequests.remove(aId);
@@ -393,11 +397,16 @@ protected synchronized void handleEvent(
         ContainerId containerId = assignedRequests.getContainerId(aId);
         if (containerId != null) {
           // Ask the container to stop.
-          sendEvent(new AMContainerEvent(containerId, AMContainerEventType.C_STOP_REQUEST));
-          // Inform the Node - the task has asked to be STOPPED / has already stopped.
-          sendEvent(new AMNodeEventTaskAttemptEnded(containerMap.get(containerId).getContainer().getNodeId(),
containerId, event.getAttemptID(), event.failed()));
+          sendEvent(new AMContainerEvent(containerId,
+              AMContainerEventType.C_STOP_REQUEST));
+          // Inform the Node - the task has asked to be STOPPED / has already
+          // stopped.
+          sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
+              .get(containerId).getContainer().getNodeId(), containerId,
+              event.getAttemptID(), event.failed()));
         } else {
-          LOG.warn("Received a STOP request for absent taskAttempt: " + event.getAttemptID());
+          LOG.warn("Received a STOP request for absent taskAttempt: "
+              + event.getAttemptID());
         }
       }
     }
@@ -409,33 +418,28 @@ protected synchronized void handleEvent(
     // XXX: Maybe send the request to the task before sending it to the scheduler - the scheduler
can then
     //  query the task to figure out whether the taskAttempt is the successfulAttempt - and
whether to count it towards the reduce ramp up.
     //  Otherwise -> Job.getCompletedMaps() - will give an out of date picture, since
the scheduler event will always be generated before the TaskCompleted event to the job.
-    
-    LOG.info("Processing the event " + event.toString());
     attemptToLaunchRequestMap.remove(event.getAttemptID());
     ContainerId containerId = assignedRequests.remove(event.getAttemptID());
     if (containerId != null) { // TODO Should not be null. Confirm.
-      sendEvent(new AMContainerTASucceededEvent(containerId, event.getAttemptID()));
-      sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap.get(containerId).getContainer().getNodeId(),
containerId, event.getAttemptID()));
+      sendEvent(new AMContainerTASucceededEvent(containerId,
+          event.getAttemptID()));
+      sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
+          .get(containerId).getContainer().getNodeId(), containerId,
+          event.getAttemptID()));
       containerAvailable(containerId);
     } else {
       LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
           + event.getAttemptID() + ". Full event: " + event);
     }
   }
-  
+
   // TODO XXX: Deal with node blacklisting.
   
   private void handleContainersAllocated(AMSchedulerEventContainersAllocated event) {
-    // TODO XXX: Maybe have an event from the Requestor -> saying AllocationChanged ->
listOfNewContainers, listOfFinishedContainers (finished containers goes to Containers directly,
but should always come in from the RM)
-    // TODO XXX
-    /*
-     * Start allocating containers. Match requests to capabilities. 
-     * Send out Container_START / Container_TA_ASSIGNED events.
-     */
-    // TODO XXX: Logging of the assigned containerIds.
 
-    LOG.info("Processing the event " + event.toString());
     availableContainerIds.addAll(event.getContainerIds());
+  
+
     if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
       // TODO XXX -> recaulculateReduceSchedule in case of released containers
       // .... would imply CONTAINER_COMPLETED messages are required by the Scheduler.
@@ -445,6 +449,7 @@ protected synchronized void handleEvent(
     schedule();
   }
 
+  // TODO XXX: Deal with node blacklisting.
   
   
   // TODO Override for re-use.
@@ -480,6 +485,7 @@ protected synchronized void handleEvent(
   private synchronized void schedule() {
     assignContainers();
     requestContainers();
+    lastScheduleTime = clock.getTime();
   }
 
   protected void containerAvailable(ContainerId containerId) {
@@ -493,17 +499,6 @@ protected synchronized void handleEvent(
     }
   }
 
-  // TODO Override for container re-use.
-//  protected void containerAvailable(ContainerId containerId) {
-//    // For now releasing the container.
-//    // allocatedContainerIds.add(containerId);
-//    sendEvent(new AMContainerEvent(containerId,
-//        AMContainerEventType.C_STOP_REQUEST));
-//    // XXX A release should not be required. Only required when a container
-//    // cannot be assigned, or if there's an explicit request to stop the container,
-//    // in which case the release request will go out from the container itself.
-//  }
-
   @SuppressWarnings("unchecked")
   private int maybeComputeNormalizedRequestForType(
       AMSchedulerTALaunchRequestEvent event, TaskType taskType,
@@ -704,11 +699,14 @@ protected synchronized void handleEvent(
         " AssignedReduces:" + assignedRequests.reduces.size() +
         " completedMaps:" + getJob().getCompletedMaps() + 
         " completedReduces:" + getJob().getCompletedReduces() +
-        " containersAllocated:" + containersAllocated +
+        " containersAllocated:" + containersAllocated + //Not super useful.
+        " newContainersAllocated: " + newContainerAllocations +
+        " existingContainersAllocated: " + existingContainerAllocations +
         " containersReleased:" + containersReleased +
         " hostLocalAssigned:" + hostLocalAssigned + 
         " rackLocalAssigned:" + rackLocalAssigned +
         " availableResources(headroom):" + requestor.getAvailableResources();
+    // TODO (Post 3902): Can hostLocal/rackLocal be handled elsewhere.
   }
 
 
@@ -719,11 +717,17 @@ protected synchronized void handleEvent(
 
   @Private
   public int getMemLimit() {
-    int headRoom = requestor.getAvailableResources() != null ? requestor.getAvailableResources().getMemory()
: 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
-       assignedRequests.reduces.size() * reduceResourceReqt;
+    int headRoom = requestor.getAvailableResources() != null ? requestor
+        .getAvailableResources().getMemory() : 0;
+    return headRoom + assignedRequests.maps.size() * mapResourceReqt
+        + assignedRequests.reduces.size() * reduceResourceReqt;
   }
   
+  
+  /**
+   * Tracks attempts for which a Container ask has been sent to the
+   * RMCommunicator.
+   */
   private class ScheduledRequests {
     
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
@@ -820,7 +824,13 @@ protected synchronized void handleEvent(
       containersAllocated += allocatedContainerIds.size();
       while (it.hasNext()) {
         ContainerId containerId = it.next();
-        Container allocated = containerMap.get(containerId).getContainer();
+        AMContainer amContainer = containerMap.get(containerId);
+        Container allocated = amContainer.getContainer();
+        if (amContainer.getState() == AMContainerState.ALLOCATED) {
+          newContainerAllocations++;
+        } else {
+          existingContainerAllocations++;
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Assigning container " + allocated.getId()
               + " with priority " + allocated.getPriority() + " to NM "
@@ -1108,6 +1118,9 @@ protected synchronized void handleEvent(
     }
   }
 
+  /**
+   * Tracks TaskAttempts which have been assigned a Container.
+   */
   private class AssignedRequests {
     private final LinkedHashMap<TaskAttemptId, Container> maps = 
       new LinkedHashMap<TaskAttemptId, Container>();
@@ -1159,6 +1172,7 @@ protected synchronized void handleEvent(
     }
     
     // TODO XXX Check where all this is being used.
+    // XXX: Likely needed in case of TA failed / killed / terminated as well.
     // Old code was removing when CONTAINER_COMPLETED was received fromthe RM.
     ContainerId remove(TaskAttemptId tId) {
       ContainerId containerId = null;

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1379650&r1=1379649&r2=1379650&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
Fri Aug 31 22:39:10 2012
@@ -301,9 +301,19 @@ public class RMContainerRequestor extend
     }
   }
 
+  private String getStat() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("ContainersAllocated: ").append(numContainersAllocated)
+        .append(", ContainersFinished: ").append(numFinishedContainers)
+        .append(", NumContainerReleaseRequests: ")
+        .append(numContainerReleaseRequests);
+    return sb.toString();
+  }
+  
   @SuppressWarnings("unchecked")
   @Override
   protected void heartbeat() throws Exception {
+    LOG.info("BeforeHeartbeat: " + getStat());
     int headRoom = getAvailableResources() != null ? getAvailableResources()
         .getMemory() : 0;// first time it would be null
     AMResponse response = errorCheckedMakeRemoteRequest();
@@ -322,6 +332,8 @@ public class RMContainerRequestor extend
     List<NodeReport> updatedNodeReports = response.getUpdatedNodes();
     logUpdatedNodes(updatedNodeReports);
  
+    LOG.info("AfterHeartbeat: " + getStat());
+    
     // Inform the Containers about completion..
     for (ContainerStatus c : finishedContainers) {
       eventHandler.handle(new AMContainerEventReleased(c));
@@ -420,6 +432,7 @@ public class RMContainerRequestor extend
       RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)
rawEvent;
       releaseLock.lock();
       try {
+        // TODO XXX: Currently the RM does not handle release requests for RUNNING containers.
         numContainerReleaseRequests++;
         release.add(event.getContainerId());
       } finally {
@@ -506,7 +519,7 @@ public class RMContainerRequestor extend
   
   private void logFinishedContainers(List<ContainerStatus> finishedContainers) {
     if (finishedContainers.size() > 0) {
-      LOG.info(finishedContainers.size() + " finished");
+      LOG.info(finishedContainers.size() + " containers finished");
       for (ContainerStatus cs : finishedContainers) {
         LOG.info("FinihsedContainer: " + cs);
       }



Mime
View raw message