hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1384610 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/a...
Date Fri, 14 Sep 2012 00:36:34 GMT
Author: sseth
Date: Fri Sep 14 00:36:33 2012
New Revision: 1384610

URL: http://svn.apache.org/viewvc?rev=1384610&view=rev
Log:
MAPREDUCE-4626. Fix and re-enable RMContainerAllocator unit tests (sseth)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
Removed:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 14 00:36:33 2012
@@ -14,3 +14,5 @@ Branch MR-3902
   MAPREDUCE-4619. Change AMContainerMap to extend AbstractService (Tsuyoshi OZAWA via sseth)
 
   MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted. (sseth)
+
+  MAPREDUCE-4626. Fix and re-enable RMContainerAllocator unit tests (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Fri Sep 14 00:36:33 2012
@@ -505,8 +505,8 @@ public class TaskAttemptListenerImpl2 ex
 
   public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {
     // TODO XXX: pullTaskAttempt as part of the interface.
-    AMContainerImpl container = (AMContainerImpl) context
-        .getContainer(containerId);
+    AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
+        .get(containerId);
     return container.pullTaskAttempt();
   }
 

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/AppContext.java Fri Sep 14 00:36:33 2012
@@ -63,9 +63,7 @@ public interface AppContext {
   
   ClusterInfo getClusterInfo();
   
-  AMContainer getContainer(ContainerId containerId);
   AMContainerMap getAllContainers();
   
-  AMNode getNode(NodeId nodeId);
   AMNodeMap getAllNodes();
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 14 00:36:33 2012
@@ -984,11 +984,6 @@ public class MRAppMaster extends Composi
     public ClusterInfo getClusterInfo() {
       return this.clusterInfo;
     }
-    
-    @Override
-    public AMContainer getContainer(ContainerId containerId) {
-      return containers.get(containerId);
-    }
 
     @Override
     public AMContainerMap getAllContainers() {
@@ -996,11 +991,6 @@ public class MRAppMaster extends Composi
     }
 
     @Override
-    public AMNode getNode(NodeId nodeId) {
-      return nodes.get(nodeId);
-    }
-
-    @Override
     public AMNodeMap getAllNodes() {
       return nodes;
     }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/JobImpl.java Fri Sep 14 00:36:33 2012
@@ -784,6 +784,8 @@ public class JobImpl implements org.apac
         return job.finished(JobState.FAILED);
       }
       job.logJobHistoryFinishedEvent();
+      // TODO: Maybe set cleanup progress. Otherwise job progress will
+      // always stay at 0.95 when reported from an AM.
       return job.finished(JobState.SUCCEEDED);
     }
     return null;

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java Fri Sep 14 00:36:33 2012
@@ -882,7 +882,8 @@ public abstract class TaskAttemptImpl im
 
       // TODO XXX What all changes here after CLC construction is done. Remove TODOs after that.
       
-      Container container = ta.appContext.getContainer(event.getContainerId()).getContainer();
+      Container container = ta.appContext.getAllContainers()
+          .get(event.getContainerId()).getContainer();
       
       ta.containerId = event.getContainerId();
       ta.containerNodeId = container.getNodeId();

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Sep 14 00:36:33 2012
@@ -90,9 +90,9 @@ public class RMContainerAllocator extend
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   
-  private static final Priority PRIORITY_FAST_FAIL_MAP;
-  private static final Priority PRIORITY_REDUCE;
-  private static final Priority PRIORITY_MAP;
+  protected static final Priority PRIORITY_FAST_FAIL_MAP;
+  protected static final Priority PRIORITY_REDUCE;
+  protected static final Priority PRIORITY_MAP;
 
   private Thread eventHandlingThread;
   private volatile boolean stopEventHandling;
@@ -106,10 +106,10 @@ public class RMContainerAllocator extend
     PRIORITY_MAP.setPriority(20);
   }
   
-  private final AppContext appContext;
-  private final Clock clock;
-  private Job job;
-  private final JobId jobId;
+  protected final AppContext appContext;
+  protected final Clock clock;
+  protected Job job;
+  protected final JobId jobId;
   private final RMContainerRequestor requestor;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
@@ -328,26 +328,32 @@ public class RMContainerAllocator extend
   }
 
   protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
-    // Recalculating reduce schedule here since it's required for most events.
-    recalculateReduceSchedule = true;
     LOG.info("XXX: Processing the event " + sEvent.toString());
-    switch(sEvent.getType()) {
+    switch (sEvent.getType()) {
     case S_TA_LAUNCH_REQUEST:
+      recalculateReduceSchedule = true;
       handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
       break;
-    case S_TA_STOP_REQUEST: //Effectively means a failure.
-      handleTaStopRequest((AMSchedulerTAStopRequestEvent)sEvent);
+    case S_TA_STOP_REQUEST: // Effectively means a failure.
+      recalculateReduceSchedule = true;
+      handleTaStopRequest((AMSchedulerTAStopRequestEvent) sEvent);
       break;
     case S_TA_SUCCEEDED:
-      handleTaSucceededRequest((AMSchedulerTASucceededEvent)sEvent);
+      recalculateReduceSchedule = true;
+      handleTaSucceededRequest((AMSchedulerTASucceededEvent) sEvent);
       break;
     case S_TA_ENDED:
-      // TODO XXX XXX: Not generated yet. Depends on E05 etc. Also look at TaskAttempt transitions.
+      recalculateReduceSchedule = true;
+      // TODO XXX XXX: Not generated yet. Depends on E05 etc. Also look at
+      // TaskAttempt transitions.
       break;
     case S_CONTAINERS_ALLOCATED:
+      // Conditional recalculateReduceSchedule
       handleContainersAllocated((AMSchedulerEventContainersAllocated) sEvent);
       break;
-    case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler.
+    case S_CONTAINER_COMPLETED: // Nothing specific to be done in this
+                                // scheduler.
+      recalculateReduceSchedule = true;
       break;
     case S_NODE_BLACKLISTED:
       handleNodeBlacklisted((AMSchedulerEventNodeBlacklisted) sEvent);
@@ -356,13 +362,15 @@ public class RMContainerAllocator extend
       // Ignore. RM will not allocated containers on this node.
       break;
     case S_NODE_HEALTHY:
-      // Ignore. RM will start allocating containers if there's pending requests.
+      // Ignore. RM will start allocating containers if there's pending
+      // requests.
       break;
     }
   }
 
   private void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
     // Add to queue of pending tasks.
+    recalculateReduceSchedule = true;
     attemptToLaunchRequestMap.put(event.getAttemptID(), event);
     if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
       mapResourceReqt = maybeComputeNormalizedRequestForType(event,
@@ -905,8 +913,10 @@ public class RMContainerAllocator extend
           
           // TODO Differentiation between blacklisted versus unusable nodes ?
           // Ideally there should be no assignments on unhealthy nodes.
-          blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost);
-          nodeUnhealthy = appContext.getNode(allocated.getNodeId()).isUnhealthy();
+          blackListed = appContext.getAllNodes().isHostBlackListed(
+              allocatedHost);
+          nodeUnhealthy = appContext.getAllNodes().get(allocated.getNodeId())
+              .isUnhealthy();
           
           if (nodeUnhealthy || blackListed) {
             // we need to request for a new container 
@@ -952,7 +962,7 @@ public class RMContainerAllocator extend
 
               // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
 
-              if (appContext.getContainer(containerId).getState() == AMContainerState.ALLOCATED) {
+              if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
                 eventHandler.handle(new AMContainerLaunchRequestEvent(
                     containerId, attemptToLaunchRequestMap.get(assigned
                         .getAttemptId()), requestor.getApplicationAcls(),

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Sep 14 00:36:33 2012
@@ -321,7 +321,7 @@ public class AMContainerImpl implements 
       // launch. Save AM resources.
       
       container.jvmId = new WrappedJvmID(taEvent.getRemoteTask().getJobID(), taEvent.getRemoteTask().isMapTask(), container.containerId.getId());
-      
+
       container.clc = createContainerLaunchContext(
           event.getApplicationAcls(), container.getContainerId(),
           container.appContext.getJob(event.getJobId()).getConf(), taEvent.getJobToken(),

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Fri Sep 14 00:36:33 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
@@ -67,15 +68,16 @@ public class AMNodeMap extends AbstractS
     this.blacklistDisablePercent = config.getInt(
           MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
           MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
-    
-    LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+
+    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
+        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
+        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
       throw new YarnException("Invalid blacklistDisablePercent: "
           + blacklistDisablePercent
           + ". Should be an integer between 0 and 100 or -1 to disabled");
-    }
-    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
-    
+    }    
     super.init(conf);
   }
   
@@ -175,4 +177,9 @@ public class AMNodeMap extends AbstractS
   public int size() {
     return nodeMap.size();
   }
+  
+  @Private
+  public boolean isBlacklistingIgnored() {
+    return this.ignoreBlacklisting;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Sep 14 00:36:33 2012
@@ -88,8 +88,6 @@ public class TestTaskAttemptListenerImpl
     when(amContainers.get(containerId2)).thenReturn(amContainer2);
     
     when(appCtx.getAllContainers()).thenReturn(amContainers);
-    when(appCtx.getContainer(containerId1)).thenReturn(amContainer1);
-    when(appCtx.getContainer(containerId2)).thenReturn(amContainer2);
     when(appCtx.getEventHandler()).thenReturn(mockHandler);
     
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 14 00:36:33 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
@@ -135,6 +136,8 @@ public class MRApp extends MRAppMaster {
 
   static ApplicationId applicationId;
 
+  // TODO: Look at getting rid of this. Each test should generate it's own id, 
+  // or have it provided.. Using a custom id without updating this causes problems.
   static {
     applicationId = recordFactory.newRecordInstance(ApplicationId.class);
     applicationId.setClusterTimestamp(0);
@@ -189,6 +192,7 @@ public class MRApp extends MRAppMaster {
       }
     }
 
+    applicationId = appAttemptId.getApplicationId();
     this.maps = maps;
     this.reduces = reduces;
     this.autoComplete = autoComplete;
@@ -241,6 +245,27 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  /**
+   * Helper method to move a task attempt into a final state.
+   */
+  public void sendFinishToTaskAttempt(TaskAttempt taskAttempt,
+      TaskAttemptState finalState) throws Exception {
+    if (finalState == TaskAttemptState.SUCCEEDED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(taskAttempt.getID(),
+              TaskAttemptEventType.TA_DONE));
+    } else if (finalState == TaskAttemptState.KILLED) {
+      getContext().getEventHandler()
+          .handle(
+              new TaskAttemptEventKillRequest(taskAttempt.getID(),
+                  "Kill requested"));
+    } else if (finalState == TaskAttemptState.FAILED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(taskAttempt.getID(),
+              TaskAttemptEventType.TA_FAIL_REQUEST));
+    }
+  }
+
   public void waitForState(TaskAttempt attempt, 
       TaskAttemptState finalState) throws Exception {
     int timeoutSecs = 0;
@@ -487,7 +512,7 @@ public class MRApp extends MRAppMaster {
       case CONTAINER_LAUNCH_REQUEST:
         LOG.info("XXX: Handling CONTAINER_LAUNCH_REQUEST for: " + event.getContainerId());
         
-        AMContainer amContainer = getContext().getContainer(event.getContainerId());
+        AMContainer amContainer = getContext().getAllContainers().get(event.getContainerId());
         TaskAttemptId attemptIdForContainer = amContainer.getQueuedTaskAttempts().iterator().next();
         // Container Launched.
         getContext().getEventHandler().handle(
@@ -601,7 +626,7 @@ public class MRApp extends MRAppMaster {
             new NormalizedResourceEvent(TaskType.MAP, 100)));
         
         attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
-        if (getContext().getContainer(cId).getState() == AMContainerState.ALLOCATED) {
+        if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
           LOG.info("XXX: Sending launch request for container: " + lEvent);
           getContext().getEventHandler().handle(
               new AMContainerLaunchRequestEvent(cId, lEvent, appAcls, jobId));

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java Fri Sep 14 00:36:33 2012
@@ -46,16 +46,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
 import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
-import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
 import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app2.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
@@ -855,24 +852,12 @@ public class TestRuntimeEstimators {
     }
 
     @Override
-    public AMContainer getContainer(ContainerId containerId) {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
     public AMContainerMap getAllContainers() {
       // TODO Auto-generated method stub
       return null;
     }
 
     @Override
-    public AMNode getNode(NodeId nodeId) {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
     public AMNodeMap getAllNodes() {
       // TODO Auto-generated method stub
       return null;

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java Fri Sep 14 00:36:33 2012
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,15 +42,23 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
 import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -278,7 +287,152 @@ public class TestJobImpl {
     isUber = testUberDecision(conf);
     Assert.assertFalse(isUber);
   }
+  
+  @Test
+  public void testReportedAppProgress() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    
+    int numMaps = 10;
+    int numReduces = 10;
+    int numTasks = numMaps + numReduces;
+    Configuration conf = new Configuration();
+    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+        appAttemptId, 0), numMaps, numReduces, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
+    Job job = mrApp.submit(conf);
+    DrainDispatcher dispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    
+    mrApp.waitForState(job, JobState.RUNNING);
+    
+    // Empty the queue. All Attempts in RUNNING state.
+    // Using waitForState can be slow.
+
+    dispatcher.await();
+    // At this point, setup is complete. Tasks may be running.
+    float expected = 0.05f;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    Iterator<Task> it = job.getTasks().values().iterator();
+
+    // finish 1 map.
+    int toFinish = 1;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+      
+    // finish 7 more maps.
+    toFinish = 7;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish remaining 2 maps.
+    toFinish = 2;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish 2 reduces
+    toFinish = 2;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish remaining 8 reduces.
+    toFinish = 8;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    mrApp.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  @Test
+  // Refer to comments for the previous test.
+  public void testReportedAppProgressWithOnlyMaps() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    
+    int numMaps = 10;
+    int numReduces = 0;
+    int numTasks = numMaps + numReduces;
+    Configuration conf = new Configuration();
+    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+        appAttemptId, 0), numMaps, numReduces, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
+    Job job = mrApp.submit(conf);
+    DrainDispatcher dispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    
+    mrApp.waitForState(job, JobState.RUNNING);
+    
+    // Empty the queue. All Attempts in RUNNING state.
+    // Using waitForState can be slow.
 
+    dispatcher.await();
+    // At this point, setup is complete. Tasks may be running.
+    float expected = 0.05f;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    Iterator<Task> it = job.getTasks().values().iterator();
+
+    // finish 1 map.
+    int toFinish = 1;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+
+    // finish 5 more maps.
+    toFinish = 5;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+
+    // finish the rest.
+    toFinish = 4;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    // TODO This last verification should've been a race. Since the AM never
+    // goes beyond 0.95f, this is ok for now.
+    
+    // TODO. Ideally MRApp should be provide a way to signal job completion.
+    // i.e. Do not auto complete a job after all tasks completed. Use in prev
+    // test as well. 
+    mrApp.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int n,
+      DrainDispatcher dispatcher) throws Exception {
+    finishNextNTasks(mrApp, it, n);
+    dispatcher.await();
+  }
+  
+  private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int n)
+      throws Exception {
+    for (int i = 0; i < n; i++) {
+      if (!it.hasNext()) {
+        throw new RuntimeException("Attempt to finish a non-existing task");
+      }
+      Task task = it.next();
+      finishTask(mrApp, task);
+    }
+  }
+
+  private void finishTask(MRApp mrApp, Task task) throws Exception {
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    mrApp.sendFinishToTaskAttempt(attempt, TaskAttemptState.SUCCEEDED);
+  }
   private boolean testUberDecision(Configuration conf) {
     JobID jobID = JobID.forName("job_1234567890000_0001");
     JobId jobId = TypeConverter.toYarn(jobID);

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1384610&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Fri Sep 14 00:36:33 2012
@@ -0,0 +1,987 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Test;
+
+
+public class TestRMContainerAllocator {
+
+  static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
+
+  static final Priority MAP_PRIORITY = RMContainerAllocatorForTest.getMapPriority();
+  static final Priority REDUCE_PRIORITY = RMContainerAllocatorForTest.getReducePriority();
+  static final Priority FAST_FAIL_MAP_PRIORITY = RMContainerAllocatorForTest.getFailedMapPriority();
+  
+  private static final int port = 3333;
+
+  /**
+   * Verifies simple allocation. Matches pending requets to assigned containers.
+   */
+  @Test
+  public void testSimple() {
+    LOG.info("Running testSimple");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
+        new String[] { "h3" });
+
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    assertEquals(3, rmComm.addRequests.size());
+
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2,
+        event3 }, eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+  
+  /**
+   * Verifies allocation based on resource ask and allocation.
+   */
+  @Test
+  public void testResource() {
+    LOG.info("Running testResource");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 2048,
+        new String[] { "h2" });
+
+    // Register the asks with the AMScheduler
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+
+    assertEquals(2, rmComm.addRequests.size());
+
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 2048, MAP_PRIORITY);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+  
+  @Test
+  public void testMapReduceScheduling() {
+    LOG.info("Running testMapReduceScheduling");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    // Map, previously failed.
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 2048,
+        new String[] { "h1", "h2" }, true, false);
+
+    // Reduce, first attempt.
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 3000,
+        new String[] { "h1" }, false, true);
+
+    // Map, first attempt.
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 2048,
+        new String[] { "h3" }, false, false);
+
+    // Register the asks with the AMScheduler
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    // The reduce should not have been asked for yet - event2
+    assertEquals(2, rmComm.addRequests.size());
+
+   
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h1", 3072,
+        REDUCE_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 2048, MAP_PRIORITY);
+    Container container4 = newContainer(appContext, 4, "h1", 2048,
+        FAST_FAIL_MAP_PRIORITY);
+    // Container1 - release low mem.
+    // Container2 - release no pending reduce request.
+    // Container3 - assign to t3
+    // Container4 - assign to t1
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+    containerIds.add(container4.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Two stop container events for Container1 and Container2 ?
+    assertEquals(2, eventHandler.stopEvents.size());
+
+    // Since maps have been assigned containers. Verify that a request is sent
+    // out for the pending reduce task.
+    assertEquals(3, rmComm.addRequests.size());
+
+    // Verify map assignments.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event3 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+
+    eventHandler.reset();
+
+    Container container5 = newContainer(appContext, 5, "h1", 3072,
+        REDUCE_PRIORITY); // assign to t2
+    containerIds.clear();
+    containerIds.add(container5.getId());
+    allocatedEvent = new AMSchedulerEventContainersAllocated(containerIds,
+        false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Verify reduce assignment.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event2 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+
+  @Test
+  public void testReduceScheduling() throws Exception {
+    int totalMaps = 10;
+    int succeededMaps = 1;
+    int scheduledMaps = 10;
+    int scheduledReduces = 0;
+    int assignedMaps = 2;
+    int assignedReduces = 0;
+    int mapResourceReqt = 1024;
+    int reduceResourceReqt = 2*1024;
+    int numPendingReduces = 4;
+    float maxReduceRampupLimit = 0.5f;
+    float reduceSlowStart = 0.2f;
+    
+    RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+    doCallRealMethod().when(allocator).
+        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
+            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
+    
+    // Test slow-start
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    
+    // verify slow-start still in effect when no more maps need to
+    // be scheduled but some have yet to complete
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps,
+        0, scheduledReduces,
+        totalMaps - succeededMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    verify(allocator, never()).scheduleAllReduces();
+
+    succeededMaps = 3;
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, times(1)).setIsReduceStarted(true);
+    
+    // Test reduce ramp-up
+    doReturn(100 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampUpReduces(anyInt());
+    verify(allocator, never()).rampDownReduces(anyInt());
+
+    // Test reduce ramp-down
+    scheduledReduces = 3;
+    doReturn(10 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampDownReduces(anyInt());
+  }
+
+  @Test
+  public void testBlackListedNodes() throws Exception {
+    LOG.info("Running testBlackListedNodes");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    // 3 requests on 3 hosts.
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1", "h4" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
+        new String[] { "h3" });
+
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    assertEquals(3, rmComm.addRequests.size());
+
+    // 3 containers on 3 hosts.
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
+
+    // Simulate two blacklisted nodes.
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(container1.getNodeId(),
+            AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(container2.getNodeId(),
+            AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+
+    rmComm.reset();
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Only contianer 3 should have been assigned.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event3 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+
+    // Verify container stop events for the remaining two containers.
+    assertEquals(2, eventHandler.stopEvents.size());
+    Set<ContainerId> tmpSet = new HashSet<ContainerId>();
+    tmpSet.add(container1.getId());
+    tmpSet.add(container2.getId());
+    for (AMContainerEvent ame : eventHandler.stopEvents) {
+      tmpSet.remove(ame.getContainerId());
+    }
+    assertEquals(0, tmpSet.size());
+
+    // Verify new request events were sent out to replace these containers.
+    assertEquals(2, rmComm.addRequests.size());
+    // One of the requests should refer to host4.
+    boolean hostSeen = false;
+    for (ContainerRequest c : rmComm.addRequests) {
+      if (c.hosts.length != 0) {
+        if (!hostSeen) {
+          assertEquals(1, c.hosts.length);
+          assertEquals("h4", c.hosts[0]);
+          hostSeen = true;
+        } else {
+          fail("Only one request should have a host");
+        }
+      }
+    }
+  }
+  
+  @Test
+  public void testIgnoreBlacklisting() throws Exception {
+    LOG.info("Running testIgnoreBlacklisting");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
+
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    int attemptId = 0;
+    int currentHostNum = 0;
+    NodeId[] nodeIds = new NodeId[10];
+
+    // Add a node.
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+
+    // known=1, blacklisted=0. IgnoreBlacklisting=false, Assign 1
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Blacklist node1.
+    blacklistNode(nodeIds[0], appContext);
+
+    // known=1, blacklisted=1. IgnoreBlacklisting=true, Assign 1
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
+    // on non blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
+    // on blacklisted)
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
+    // (Request on non blacklisted)
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
+    // (Request on blacklisted)
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(0, eventHandler.assignEvents.size());
+
+    // Blacklist node2.
+    blacklistNode(nodeIds[1], appContext);
+
+    // Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
+    // on blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Blacklist node3. (While ignore is enabled)
+    blacklistNode(nodeIds[2], appContext);
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
+    // on non-blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Add 5 more nodes.
+    for (int i = 0; i < 5; i++) {
+      currentHostNum++;
+      nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    }
+
+    // Known=9, blacklisted=3, ignore should be false - assign 1 on host3.
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[2], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(0, eventHandler.assignEvents.size());
+
+  }
+
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+    YarnConfiguration conf = new YarnConfiguration();
+
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RecalculateContainerAllocator scheduler = new RecalculateContainerAllocator(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+    Job job = appContext.getJob(jobId);
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(2).when(job).getCompletedMaps();
+    doReturn(0).when(job).getCompletedReduces();
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+
+    // Ignore the first allocate.
+    scheduler.handleEvent(allocatedEvent);
+
+    // Since nothing changed, recalculate reduce = false.
+    scheduler.recalculatedReduceSchedule = false;
+    scheduler.handleEvent(allocatedEvent);
+    assertFalse("Unexpected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+
+    // Change completedMaps. Recalcualte reduce should be true.
+    doReturn(1).when(job).getCompletedMaps();
+    scheduler.handleEvent(allocatedEvent);
+    assertTrue("Expected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+
+    // Since nothing changed, recalculate reduce = false.
+    scheduler.recalculatedReduceSchedule = false;
+    scheduler.handleEvent(allocatedEvent);
+    assertFalse("Unexpected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+  }
+  
+  // TODO XXX Unit test for AMNode to simulate node health status change.
+  
+  
+  
+  
+  
+  
+  private void blacklistNode(NodeId nodeId, AppContext appContext) {
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(nodeId, AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+  }
+
+  private NodeId addNode(int currentHostNum, AppContext appContext) {
+    NodeId nodeId = BuilderUtils.newNodeId("h" + currentHostNum, port);
+    appContext.getAllNodes().nodeSeen(nodeId);
+    appContext.getAllNodes().handle(
+        new AMNodeEventNodeCountUpdated(appContext.getAllNodes().size()));
+    return nodeId;
+  }
+
+  /**
+   * Generates the events for a TaskAttempt start request and an associated 
+   * container assignment. Resets TrackingEventHandler statistics.
+   */
+  void assignContainerOnHost(JobId jobId, NodeId nodeId, int attemptId,
+      RMContainerAllocatorForTest scheduler, TrackingEventHandler eventHandler,
+      AppContext appContext) {
+    eventHandler.reset();
+    AMSchedulerTALaunchRequestEvent launchRequest = createTALaunchReq(jobId,
+        attemptId, 1024, new String[] { nodeId.getHost() });
+    scheduler.handleEvent(launchRequest);
+    Container container = newContainer(appContext, attemptId, nodeId.getHost(),
+        1024, MAP_PRIORITY);
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container.getId());
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+  }
+
+  /**
+   * Verify all assignments and launch requests against the requests.
+   */
+  private void checkAssignments(AMSchedulerTALaunchRequestEvent[] requests,
+      List<AMContainerLaunchRequestEvent> launchRequests,
+      List<AMContainerAssignTAEvent> assignEvents, boolean checkHostMatch,
+      AppContext appContext) {
+
+    assertNotNull("Containers not assigned", launchRequests);
+    assertNotNull("Containers not assigned", assignEvents);
+    assertEquals("LaunchRequest Count not correct", requests.length,
+        launchRequests.size());
+    assertEquals("Assigned Count not correct", requests.length,
+        assignEvents.size());
+
+    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+    // Check for uniqueness of container id launch requests
+    for (AMContainerLaunchRequestEvent launchRequest : launchRequests) {
+      containerIds.add(launchRequest.getContainerId());
+    }
+    assertEquals("Multiple launch requests for same container id",
+        assignEvents.size(), containerIds.size());
+
+    // Check for uniqueness of container Id assignments.
+    containerIds.clear();
+    for (AMContainerAssignTAEvent assignEvent : assignEvents) {
+      containerIds.add(assignEvent.getContainerId());
+    }
+    assertEquals("Assigned container Ids not unique", assignEvents.size(),
+        containerIds.size());
+
+    AMContainerAssignTAEvent assignment = null;
+    // Check that all requests were assigned a container.
+    for (AMSchedulerTALaunchRequestEvent request : requests) {
+      for (AMContainerAssignTAEvent assignEvent : assignEvents) {
+        if (request.getAttemptID().equals(assignEvent.getTaskAttemptId())) {
+          assignment = assignEvent;
+          break;
+        }
+      }
+      checkAssignment(request, assignment, checkHostMatch, appContext);
+      assignment = null;
+    }
+  }
+
+  
+  /**
+   * Verify assignment for a single request / allocation, optionally checking
+   * for the requested host.
+   */
+  private void checkAssignment(AMSchedulerTALaunchRequestEvent request,
+      AMContainerAssignTAEvent assignEvent, boolean checkHostMatch,
+      AppContext appContext) {
+
+    Assert.assertNotNull(
+        "Nothing assigned to attempt " + request.getAttemptID(), assignEvent);
+    if (checkHostMatch) {
+      if (request.getHosts().length == 0) {
+        return;
+      } else {
+        Assert.assertTrue(
+            "Not assigned to requested host",
+            Arrays.asList(request.getHosts()).contains(
+                appContext.getAllContainers().get(assignEvent.getContainerId())
+                    .getContainer().getNodeId().getHost()));
+      }
+    }
+  }
+
+  
+  /**
+   * Create containers for allocation. Will also register the associated node 
+   * with the AMNodeMap, and the container with the AMContainerMap.
+   */
+  private Container newContainer(AppContext appContext,
+      int containerNum, String host, int memory, Priority priority) {
+    ContainerId containerId = BuilderUtils.newContainerId(appContext.getApplicationAttemptId(),
+        containerNum);
+    NodeId nodeId = BuilderUtils.newNodeId(host, port);
+    
+    appContext.getAllNodes().nodeSeen(nodeId);
+    Resource resource = BuilderUtils.newResource(memory);
+    Container container = BuilderUtils.newContainer(containerId, nodeId, host
+        + ":8000", resource, priority, null);
+    appContext.getAllContainers().addNewContainer(container);
+    return container;
+  }
+  
+  
+  private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
+      int taskAttemptId, int memory, String[] hosts) {
+    return createTALaunchReq(jobId, taskAttemptId, memory, hosts, false, false);
+  }
+  
+  private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
+      int taskAttemptId, int memory, String[] hosts,
+      boolean earlierFailedAttempt, boolean reduce) {
+    Resource resource = BuilderUtils.newResource(memory);
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    String[] hostArray;
+    String[] rackArray;
+    if (earlierFailedAttempt) {
+      hostArray = new String[0];
+      rackArray = new String[0];
+    } else {
+      hostArray = hosts;
+      rackArray = new String[] { NetworkTopology.DEFAULT_RACK };
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+        taskAttemptId);
+    AMSchedulerTALaunchRequestEvent event = new AMSchedulerTALaunchRequestEvent(
+        attemptId, earlierFailedAttempt, resource, null, null, null, null,
+        hostArray, rackArray);
+    return event;
+  }
+  
+  static class RMContainerAllocatorForTest extends RMContainerAllocator {
+
+    public RMContainerAllocatorForTest(RMContainerRequestor requestor,
+        AppContext appContext) {
+      super(requestor, appContext);
+    }
+    
+    @Override
+    public void start() {
+      this.job = appContext.getJob(jobId);
+    }
+    
+    @Override
+    protected void handleEvent(AMSchedulerEvent event) {
+      super.handleEvent(event);
+    }
+    
+    static Priority getMapPriority() {
+      return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
+    }
+    
+    static Priority getReducePriority() {
+      return BuilderUtils.newPriority(PRIORITY_REDUCE.getPriority());
+    }
+    
+    static Priority getFailedMapPriority() {
+      return BuilderUtils.newPriority(PRIORITY_FAST_FAIL_MAP.getPriority());
+    }
+  }
+
+  static class RecalculateContainerAllocator extends
+      RMContainerAllocatorForTest {
+
+    boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(RMContainerRequestor requestor,
+        AppContext appContext) {
+      super(requestor, appContext);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+  }
+
+  class TrackingAMContainerRequestor extends RMContainerRequestor {
+
+    List<ContainerRequest> addRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
+    List<ContainerRequest> decRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
+    private Resource minContainerMemory = Records.newRecord(Resource.class);
+    private Resource maxContainerMemory = Records.newRecord(Resource.class);
+    
+    void reset() {
+      addRequests.clear();
+      decRequests.clear();
+    }
+
+    public TrackingAMContainerRequestor(AppContext context) {
+      this(context, 1024, 10240);
+    }
+
+    public TrackingAMContainerRequestor(AppContext context,
+        int minContainerMemory, int maxContainerMemory) {
+      super(null, context);
+      this.minContainerMemory.setMemory(minContainerMemory);
+      this.maxContainerMemory.setMemory(maxContainerMemory);
+    }
+   
+    @Override
+    public void addContainerReq(ContainerRequest request) {
+      addRequests.add(request);
+    }
+    
+    @Override
+    public void decContainerReq(ContainerRequest request) {
+      decRequests.add(request);
+    }
+    
+    @Override
+    public Map<ApplicationAccessType, String> getApplicationAcls() {
+      return null;
+    }
+    
+    @Override
+    protected Resource getAvailableResources() {
+      return BuilderUtils.newResource(0);
+    }
+    
+    @Override
+    protected Resource getMaxContainerCapability() {
+      return maxContainerMemory;
+    }
+    
+    @Override
+    protected Resource getMinContainerCapability() {
+      return minContainerMemory;
+    }
+
+    @Override
+    public void register() {
+    }
+
+    @Override
+    public void unregister() {
+    }
+
+    @Override
+    public void startAllocatorThread() {
+    }
+    
+    @Override
+    public AMRMProtocol createSchedulerProxy() {
+      return null;
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  private class TrackingEventHandler implements EventHandler {
+
+    List<AMContainerLaunchRequestEvent> launchRequests = new LinkedList<AMContainerLaunchRequestEvent>();
+    List<AMContainerAssignTAEvent> assignEvents = new LinkedList<AMContainerAssignTAEvent>();
+    List<AMContainerEvent> stopEvents = new LinkedList<AMContainerEvent>();
+    List<RMCommunicatorContainerDeAllocateRequestEvent> releaseEvents = new LinkedList<RMCommunicatorContainerDeAllocateRequestEvent>();
+    
+    @Override
+    public void handle(Event event) {
+      if (event.getType() == AMContainerEventType.C_START_REQUEST) {
+        launchRequests.add((AMContainerLaunchRequestEvent)event);
+      } else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
+        assignEvents.add((AMContainerAssignTAEvent)event);
+      } else if (event.getType() == AMContainerEventType.C_STOP_REQUEST) {
+        stopEvents.add((AMContainerEvent)event);
+      } else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
+        releaseEvents.add((RMCommunicatorContainerDeAllocateRequestEvent)event);
+      }
+    }
+    
+    public void reset() {
+      this.launchRequests.clear();
+      this.assignEvents.clear();
+      this.stopEvents.clear();
+    }
+  }
+
+  // TODO Allow specifying the jobId as a parameter
+  @SuppressWarnings("rawtypes") 
+  private AppContext setupDefaultTestContext(EventHandler eventHandler,
+      Configuration conf) {
+    AppContext appContext = mock(AppContext.class);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    JobID id = TypeConverter.fromYarn(appId);
+    JobId jobId = TypeConverter.toYarn(id);
+
+    Job mockJob = mock(Job.class);
+    when(mockJob.getID()).thenReturn(jobId);
+    when(mockJob.getProgress()).thenReturn(0.0f);
+
+    Clock clock = new ControlledClock(new SystemClock());
+
+    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    amNodeMap.init(conf);
+    amNodeMap.start();
+    
+
+    AMContainerMap amContainerMap = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        eventHandler, appContext);
+    amContainerMap.init(conf);
+    amContainerMap.start();
+    when(appContext.getAllContainers()).thenReturn(amContainerMap);
+
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(appContext.getEventHandler()).thenReturn(eventHandler);
+    when(appContext.getJob(jobId)).thenReturn(mockJob);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getAllNodes()).thenReturn(amNodeMap);
+
+    return appContext;
+  }
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java?rev=1384610&r1=1384609&r2=1384610&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java Fri Sep 14 00:36:33 2012
@@ -1,7 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -101,6 +120,35 @@ public class TestRMContainerRequestor {
     verifyAsks(askSet, 4, 3, 4, 4);
   }
   
+  /**
+   * Verify job progress is being reported to the RM.
+   */
+  @Test
+  public void testProgressReportedToRM() throws Exception {
+    AppContext appContext = setupDefaultTestContext();
+    TrackingAMRMProtocol amrm = new TrackingAMRMProtocol();
+    RMContainerRequestorForTest rmComm = new RMContainerRequestorForTest(appContext, amrm);
+    rmComm.init(new YarnConfiguration());
+    rmComm.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+    Job job = appContext.getJob(jobId);
+    
+    rmComm.heartbeat();
+    assertEquals(0.0f, amrm.allocateRequest.getProgress(), 0.001);
+    
+    doReturn(0.11f).when(job).getProgress();
+    rmComm.heartbeat();
+    assertEquals(0.11f, amrm.allocateRequest.getProgress(), 0.001);
+    
+    doReturn(0.95f).when(job).getProgress();
+    rmComm.heartbeat();
+    assertEquals(0.95f, amrm.allocateRequest.getProgress(), 0.001);
+  }
+  
+  
+  
   private void verifyAsks(Set<ResourceRequest> askSet, int host1, int host2, int rack1, int generic) {
     for (ResourceRequest rr : askSet) {
       if (rr.getHostName().equals("*")) {
@@ -177,10 +225,6 @@ public class TestRMContainerRequestor {
   class RMContainerRequestorForTest extends RMContainerRequestor {
 
     private AMRMProtocol amRmProtocol;
-    
-    public RMContainerRequestorForTest(AppContext context) {
-      super(null, context);
-    }
 
     public RMContainerRequestorForTest(AppContext context, AMRMProtocol amrm) {
       super(null, context);
@@ -211,6 +255,46 @@ public class TestRMContainerRequestor {
     @Override public void startAllocatorThread() {}
   }
 
+  private static class TrackingAMRMProtocol implements AMRMProtocol {
+
+    RegisterApplicationMasterRequest registerRequest;
+    FinishApplicationMasterRequest finishApplicationMasterRequest;
+    AllocateRequest allocateRequest;
+
+    public void reset() {
+      this.registerRequest = null;
+      this.finishApplicationMasterRequest = null;
+      this.allocateRequest = null;
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      this.registerRequest = request;
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnRemoteException {
+      this.finishApplicationMasterRequest = request;
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnRemoteException {
+      this.allocateRequest = request;
+      AMResponse amResponse = BuilderUtils.newAMResponse(
+          new ArrayList<Container>(), BuilderUtils.newResource(1024),
+          new ArrayList<ContainerStatus>(), false, 1,
+          new ArrayList<NodeReport>());
+      AllocateResponse allocateResponse = BuilderUtils.newAllocateResponse(
+          amResponse, 2);
+      return allocateResponse;
+    }
+  }
+
   private AppContext setupDefaultTestContext() {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
@@ -219,8 +303,8 @@ public class TestRMContainerRequestor {
     JobId jobId = TypeConverter.toYarn(id);
 
     Job mockJob = mock(Job.class);
-    when(mockJob.getID()).thenReturn(jobId);
-    when(mockJob.getProgress()).thenReturn(0.0f);
+    doReturn(0.0f).when(mockJob).getProgress();
+    doReturn(jobId).when(mockJob).getID();
 
     @SuppressWarnings("rawtypes")
     EventHandler handler = mock(EventHandler.class);



Mime
View raw message