hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [17/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,215 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.Test;
+
+public class TestMRClientService {
+
+  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  
+  @Test
+  public void test() throws Exception {
+    MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+    // send the diagnostic
+    String diagnostic1 = "Diagnostic1";
+    String diagnostic2 = "Diagnostic2";
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(attempt.getID(), diagnostic1));
+
+    // send the status update
+    TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+    taskAttemptStatus.id = attempt.getID();
+    taskAttemptStatus.progress = 0.5f;
+    taskAttemptStatus.stateString = "RUNNING";
+    taskAttemptStatus.taskState = TaskAttemptState.RUNNING;
+    taskAttemptStatus.phase = Phase.MAP;
+    taskAttemptStatus.outputSize = 3;
+    // send the status update
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
+
+    
+    //verify that all object are fully populated by invoking RPCs.
+    YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol proxy =
+      (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+          app.clientService.getBindAddress(), conf);
+    GetCountersRequest gcRequest =
+        recordFactory.newRecordInstance(GetCountersRequest.class);    
+    gcRequest.setJobId(job.getID());
+    Assert.assertNotNull("Counters is null",
+        proxy.getCounters(gcRequest).getCounters());
+
+    GetJobReportRequest gjrRequest =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    gjrRequest.setJobId(job.getID());
+    JobReport jr = proxy.getJobReport(gjrRequest).getJobReport();
+    verifyJobReport(jr);
+    
+
+    GetTaskAttemptCompletionEventsRequest gtaceRequest =
+        recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    gtaceRequest.setJobId(job.getID());
+    gtaceRequest.setFromEventId(0);
+    gtaceRequest.setMaxEvents(10);
+    Assert.assertNotNull("TaskCompletionEvents is null", 
+        proxy.getTaskAttemptCompletionEvents(gtaceRequest).getCompletionEventList());
+
+    GetDiagnosticsRequest gdRequest =
+        recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
+    gdRequest.setTaskAttemptId(attempt.getID());
+    Assert.assertNotNull("Diagnostics is null", 
+        proxy.getDiagnostics(gdRequest).getDiagnosticsList());
+
+    GetTaskAttemptReportRequest gtarRequest =
+        recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+    gtarRequest.setTaskAttemptId(attempt.getID());
+    TaskAttemptReport tar =
+        proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport();
+    verifyTaskAttemptReport(tar);
+    
+
+    GetTaskReportRequest gtrRequest =
+        recordFactory.newRecordInstance(GetTaskReportRequest.class);
+    gtrRequest.setTaskId(task.getID());
+    Assert.assertNotNull("TaskReport is null", 
+        proxy.getTaskReport(gtrRequest).getTaskReport());
+
+    GetTaskReportsRequest gtreportsRequest =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+    gtreportsRequest.setJobId(job.getID());
+    gtreportsRequest.setTaskType(TaskType.MAP);
+    Assert.assertNotNull("TaskReports for map is null", 
+        proxy.getTaskReports(gtreportsRequest).getTaskReportList());
+
+    gtreportsRequest =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+    gtreportsRequest.setJobId(job.getID());
+    gtreportsRequest.setTaskType(TaskType.REDUCE);
+    Assert.assertNotNull("TaskReports for reduce is null", 
+        proxy.getTaskReports(gtreportsRequest).getTaskReportList());
+
+    List<String> diag = proxy.getDiagnostics(gdRequest).getDiagnosticsList();
+    Assert.assertEquals("Num diagnostics not correct", 1 , diag.size());
+    Assert.assertEquals("Diag 1 not correct",
+        diagnostic1, diag.get(0).toString());
+
+    TaskReport taskReport = proxy.getTaskReport(gtrRequest).getTaskReport();
+    Assert.assertEquals("Num diagnostics not correct", 1,
+        taskReport.getDiagnosticsCount());
+
+    //send the done signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  private void verifyJobReport(JobReport jr) {
+    Assert.assertNotNull("JobReport is null", jr);
+    List<AMInfo> amInfos = jr.getAMInfos();
+    Assert.assertEquals(1, amInfos.size());
+    Assert.assertEquals(JobState.RUNNING, jr.getJobState());
+    AMInfo amInfo = amInfos.get(0);
+    Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+    Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
+    Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
+        .getAttemptId());
+    Assert.assertTrue(amInfo.getStartTime() > 0);
+    Assert.assertEquals(false, jr.isUber());
+  }
+  
+  private void verifyTaskAttemptReport(TaskAttemptReport tar) {
+    Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
+    Assert.assertNotNull("TaskAttemptReport is null", tar);
+    Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
+    Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
+    Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
+        .getAttemptId());
+  }
+  
+  class MRAppWithClientService extends MRApp {
+    MRClientService clientService = null;
+    MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
+      super(maps, reduces, autoComplete, "MRAppWithClientService", true);
+    }
+    @Override
+    protected ClientService createClientService(AppContext context) {
+      clientService = new MRClientService(context);
+      return clientService;
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TestMRClientService t = new TestMRClientService();
+    t.test();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,1480 @@
+///**
+//* Licensed to the Apache Software Foundation (ASF) under one
+//* or more contributor license agreements.  See the NOTICE file
+//* distributed with this work for additional information
+//* regarding copyright ownership.  The ASF licenses this file
+//* to you under the Apache License, Version 2.0 (the
+//* "License"); you may not use this file except in compliance
+//* with the License.  You may obtain a copy of the License at
+//*
+//*     http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing, software
+//* distributed under the License is distributed on an "AS IS" BASIS,
+//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//* See the License for the specific language governing permissions and
+//* limitations under the License.
+//*/
+//
+//package org.apache.hadoop.mapreduce.v2.app2;
+//
+//import static org.mockito.Matchers.anyFloat;
+//import static org.mockito.Matchers.anyInt;
+//import static org.mockito.Matchers.isA;
+//import static org.mockito.Mockito.doCallRealMethod;
+//import static org.mockito.Mockito.doReturn;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.never;
+//import static org.mockito.Mockito.times;
+//import static org.mockito.Mockito.verify;
+//import static org.mockito.Mockito.when;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.Arrays;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Set;
+//
+//import junit.framework.Assert;
+//
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.mapreduce.MRJobConfig;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+//import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobUpdatedNodesEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptKillEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerFailedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+//import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+//import org.apache.hadoop.net.NetUtils;
+//import org.apache.hadoop.net.NetworkTopology;
+//import org.apache.hadoop.yarn.ClusterInfo;
+//import org.apache.hadoop.yarn.YarnException;
+//import org.apache.hadoop.yarn.api.AMRMProtocol;
+//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+//import org.apache.hadoop.yarn.api.records.ApplicationId;
+//import org.apache.hadoop.yarn.api.records.ContainerId;
+//import org.apache.hadoop.yarn.api.records.ContainerState;
+//import org.apache.hadoop.yarn.api.records.ContainerStatus;
+//import org.apache.hadoop.yarn.api.records.Resource;
+//import org.apache.hadoop.yarn.api.records.ResourceRequest;
+//import org.apache.hadoop.yarn.event.Dispatcher;
+//import org.apache.hadoop.yarn.event.DrainDispatcher;
+//import org.apache.hadoop.yarn.event.Event;
+//import org.apache.hadoop.yarn.event.EventHandler;
+//import org.apache.hadoop.yarn.factories.RecordFactory;
+//import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+//import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+//import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+//import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+//import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+//import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+//import org.apache.hadoop.yarn.util.BuilderUtils;
+//import org.junit.After;
+//import org.junit.Test;
+//
+//@SuppressWarnings("unchecked")
+//public class TestRMContainerAllocator {
+//
+//  static final Log LOG = LogFactory
+//      .getLog(TestRMContainerAllocator.class);
+//  static final RecordFactory recordFactory = RecordFactoryProvider
+//      .getRecordFactory(null);
+//
+//  @After
+//  public void tearDown() {
+//    DefaultMetricsSystem.shutdown();
+//  }
+//
+//  @Test
+//  public void testSimple() throws Exception {
+//
+//    LOG.info("Running testSimple");
+//
+//    Configuration conf = new Configuration();
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+//    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+//    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+//    dispatcher.await();
+//
+//    // create the container request
+//    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+//        new String[] { "h1" });
+//    allocator.sendRequest(event1);
+//
+//    // send 1 more request with different resource req
+//    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+//        new String[] { "h2" });
+//    allocator.sendRequest(event2);
+//
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // send another request with different resource and priority
+//    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+//        new String[] { "h3" });
+//    allocator.sendRequest(event3);
+//
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+//        assigned, false);
+//  }
+//
+//  @Test
+//  public void testResource() throws Exception {
+//
+//    LOG.info("Running testResource");
+//
+//    Configuration conf = new Configuration();
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+//    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+//    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+//    dispatcher.await();
+//
+//    // create the container request
+//    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+//        new String[] { "h1" });
+//    allocator.sendRequest(event1);
+//
+//    // send 1 more request with different resource req
+//    ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
+//        new String[] { "h2" });
+//    allocator.sendRequest(event2);
+//
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+//        assigned, false);
+//  }
+//
+//  @Test
+//  public void testMapReduceScheduling() throws Exception {
+//
+//    LOG.info("Running testMapReduceScheduling");
+//
+//    Configuration conf = new Configuration();
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
+//    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+//    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+//    dispatcher.await();
+//
+//    // create the container request
+//    // send MAP request
+//    ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
+//        "h1", "h2" }, true, false);
+//    allocator.sendRequest(event1);
+//
+//    // send REDUCE request
+//    ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
+//        new String[] { "h1" }, false, true);
+//    allocator.sendRequest(event2);
+//
+//    // send MAP request
+//    ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
+//        new String[] { "h3" }, false, false);
+//    allocator.sendRequest(event3);
+//
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+//        assigned, false);
+//
+//    // validate that no container is assigned to h1 as it doesn't have 2048
+//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+//      Assert.assertFalse("Assigned count not correct", "h1".equals(assig
+//          .getContainer().getNodeId().getHost()));
+//    }
+//  }
+//
+//  private static class MyResourceManager extends MockRM {
+//
+//    public MyResourceManager(Configuration conf) {
+//      super(conf);
+//    }
+//
+//    @Override
+//    protected Dispatcher createDispatcher() {
+//      return new DrainDispatcher();
+//    }
+//
+//    @Override
+//    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+//      // Dispatch inline for test sanity
+//      return new EventHandler<SchedulerEvent>() {
+//        @Override
+//        public void handle(SchedulerEvent event) {
+//          scheduler.handle(event);
+//        }
+//      };
+//    }
+//    @Override
+//    protected ResourceScheduler createScheduler() {
+//      return new MyFifoScheduler(this.getRMContext());
+//    }
+//  }
+//
+//  @Test
+//  public void testReportedAppProgress() throws Exception {
+//
+//    LOG.info("Running testReportedAppProgress");
+//
+//    Configuration conf = new Configuration();
+//    final MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp rmApp = rm.submitApp(1024);
+//    rmDispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
+//    amNodeManager.nodeHeartbeat(true);
+//    rmDispatcher.await();
+//
+//    final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    rmDispatcher.await();
+//
+//    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+//      appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
+//      @Override
+//      protected Dispatcher createDispatcher() {
+//        return new DrainDispatcher();
+//      }
+//      protected ContainerAllocator createContainerAllocator(
+//          ClientService clientService, AppContext context) {
+//        return new MyContainerAllocator(rm, appAttemptId, context);
+//      };
+//    };
+//
+//    Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
+//
+//    mrApp.submit(conf);
+//    Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+//        .getValue();
+//
+//    DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+//
+//    MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+//      .getContainerAllocator();
+//
+//    mrApp.waitForState(job, JobState.RUNNING);
+//
+//    amDispatcher.await();
+//    // Wait till all map-attempts request for containers
+//    for (Task t : job.getTasks().values()) {
+//      if (t.getType() == TaskType.MAP) {
+//        mrApp.waitForState(t.getAttempts().values().iterator().next(),
+//          TaskAttemptState.UNASSIGNED);
+//      }
+//    }
+//    amDispatcher.await();
+//
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    amNodeManager.nodeHeartbeat(true);
+//    rmDispatcher.await();
+//    allocator.schedule();
+//    rmDispatcher.await();
+//
+//    // Wait for all map-tasks to be running
+//    for (Task t : job.getTasks().values()) {
+//      if (t.getType() == TaskType.MAP) {
+//        mrApp.waitForState(t, TaskState.RUNNING);
+//      }
+//    }
+//
+//    allocator.schedule(); // Send heartbeat
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off 1 map.
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off 7 more so that map-progress is 80%
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off the 2 remaining maps
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
+//
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    amNodeManager.nodeHeartbeat(true);
+//    rmDispatcher.await();
+//    allocator.schedule();
+//    rmDispatcher.await();
+//
+//    // Wait for all reduce-tasks to be running
+//    for (Task t : job.getTasks().values()) {
+//      if (t.getType() == TaskType.REDUCE) {
+//        mrApp.waitForState(t, TaskState.RUNNING);
+//      }
+//    }
+//
+//    // Finish off 2 reduces
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
+//
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off the remaining 8 reduces.
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    // Remaining is JobCleanup
+//    Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
+//  }
+//
+//  private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node,
+//      MRApp mrApp, Iterator<Task> it, int nextN) throws Exception {
+//    Task task;
+//    for (int i=0; i<nextN; i++) {
+//      task = it.next();
+//      finishTask(rmDispatcher, node, mrApp, task);
+//    }
+//  }
+//
+//  private void finishTask(DrainDispatcher rmDispatcher, MockNM node,
+//      MRApp mrApp, Task task) throws Exception {
+//    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+//    List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1);
+//    contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(),
+//        ContainerState.COMPLETE, "", 0));
+//    Map<ApplicationId,List<ContainerStatus>> statusUpdate =
+//        new HashMap<ApplicationId,List<ContainerStatus>>(1);
+//    statusUpdate.put(mrApp.getAppID(), contStatus);
+//    node.nodeHeartbeat(statusUpdate, true);
+//    rmDispatcher.await();
+//    mrApp.getContext().getEventHandler().handle(
+//          new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE));
+//    mrApp.waitForState(task, TaskState.SUCCEEDED);
+//  }
+//
+//  @Test
+//  public void testReportedAppProgressWithOnlyMaps() throws Exception {
+//
+//    LOG.info("Running testReportedAppProgressWithOnlyMaps");
+//
+//    Configuration conf = new Configuration();
+//    final MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp rmApp = rm.submitApp(1024);
+//    rmDispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
+//    amNodeManager.nodeHeartbeat(true);
+//    rmDispatcher.await();
+//
+//    final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    rmDispatcher.await();
+//
+//    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+//      appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
+//      @Override
+//        protected Dispatcher createDispatcher() {
+//          return new DrainDispatcher();
+//        }
+//      protected ContainerAllocator createContainerAllocator(
+//          ClientService clientService, AppContext context) {
+//        return new MyContainerAllocator(rm, appAttemptId, context);
+//      };
+//    };
+//
+//    Assert.assertEquals(0.0, rmApp.getProgress(), 0.0);
+//
+//    mrApp.submit(conf);
+//    Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+//        .getValue();
+//
+//    DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+//
+//    MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+//      .getContainerAllocator();
+//
+//    mrApp.waitForState(job, JobState.RUNNING);
+//
+//    amDispatcher.await();
+//    // Wait till all map-attempts request for containers
+//    for (Task t : job.getTasks().values()) {
+//      mrApp.waitForState(t.getAttempts().values().iterator().next(),
+//        TaskAttemptState.UNASSIGNED);
+//    }
+//    amDispatcher.await();
+//
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    amNodeManager.nodeHeartbeat(true);
+//    rmDispatcher.await();
+//    allocator.schedule();
+//    rmDispatcher.await();
+//
+//    // Wait for all map-tasks to be running
+//    for (Task t : job.getTasks().values()) {
+//      mrApp.waitForState(t, TaskState.RUNNING);
+//    }
+//
+//    allocator.schedule(); // Send heartbeat
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
+//
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//
+//    // Finish off 1 map so that map-progress is 10%
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off 5 more map so that map-progress is 60%
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
+//
+//    // Finish off remaining map so that map-progress is 100%
+//    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
+//    allocator.schedule();
+//    rmDispatcher.await();
+//    Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
+//    Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
+//  }
+//  
+//  @Test
+//  public void testUpdatedNodes() throws Exception {
+//    Configuration conf = new Configuration();
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//    
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nm1 = rm.registerNode("h1:1234", 10240);
+//    MockNM nm2 = rm.registerNode("h2:1234", 10240);
+//    dispatcher.await();
+//
+//    // create the map container request
+//    ContainerRequestEvent event = createReq(jobId, 1, 1024,
+//        new String[] { "h1" });
+//    allocator.sendRequest(event);
+//    TaskAttemptId attemptId = event.getAttemptID();
+//    
+//    TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
+//    when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
+//    Task mockTask = mock(Task.class);
+//    when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt);
+//    when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask);
+//
+//    // this tells the scheduler about the requests
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//
+//    nm1.nodeHeartbeat(true);
+//    dispatcher.await();
+//    // get the assignment
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals(1, assigned.size());
+//    Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
+//    // no updated nodes reported
+//    Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
+//    Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
+//    
+//    // mark nodes bad
+//    nm1.nodeHeartbeat(false);
+//    nm2.nodeHeartbeat(false);
+//    dispatcher.await();
+//    
+//    // schedule response returns updated nodes
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals(0, assigned.size());
+//    // updated nodes are reported
+//    Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
+//    Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
+//    Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
+//    Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
+//    allocator.getJobUpdatedNodeEvents().clear();
+//    allocator.getTaskAttemptKillEvents().clear();
+//    
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals(0, assigned.size());
+//    // no updated nodes reported
+//    Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
+//    Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
+//  }
+//
+//  @Test
+//  public void testBlackListedNodes() throws Exception {
+//    
+//    LOG.info("Running testBlackListedNodes");
+//
+//    Configuration conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+//    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+//    conf.setInt(
+//        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+//    
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//    
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+//    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+//    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+//    dispatcher.await();
+//
+//    // create the container request
+//    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+//        new String[] { "h1" });
+//    allocator.sendRequest(event1);
+//
+//    // send 1 more request with different resource req
+//    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+//        new String[] { "h2" });
+//    allocator.sendRequest(event2);
+//
+//    // send another request with different resource and priority
+//    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+//        new String[] { "h3" });
+//    allocator.sendRequest(event3);
+//
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // Send events to blacklist nodes h1 and h2
+//    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);            
+//    allocator.sendFailure(f1);
+//    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);            
+//    allocator.sendFailure(f2);
+//
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+//
+//    // mark h1/h2 as bad nodes
+//    nodeManager1.nodeHeartbeat(false);
+//    nodeManager2.nodeHeartbeat(false);
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+//
+//    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//    assigned = allocator.schedule();    
+//    dispatcher.await();
+//        
+//    Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
+//    
+//    // validate that all containers are assigned to h3
+//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+//      Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
+//          .getContainer().getNodeId().getHost()));
+//    }
+//  }
+//  
+//  @Test
+//  public void testIgnoreBlacklisting() throws Exception {
+//    LOG.info("Running testIgnoreBlacklisting");
+//
+//    Configuration conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+//    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+//    conf.setInt(
+//        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
+//
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher =
+//        (DrainDispatcher) rm.getRMContext().getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM[] nodeManagers = new MockNM[10];
+//    int nmNum = 0;
+//    List<TaskAttemptContainerAssignedEvent> assigned = null;
+//    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//    nodeManagers[0].nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId =
+//        app.getCurrentAppAttempt().getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator =
+//        new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
+//
+//    // Known=1, blacklisted=0, ignore should be false - assign first container
+//    assigned =
+//        getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+//            nodeManagers[0], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
+//        + " ignore blacklisting enabled");
+//    // Send events to blacklist nodes h1 and h2
+//    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+//    allocator.sendFailure(f1);
+//
+//    // Test single node.
+//    // Known=1, blacklisted=1, ignore should be true - assign 1
+//    assigned =
+//        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+//            nodeManagers[0], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//    // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
+//    assigned =
+//        getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+//            nodeManagers[1], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
+//    assigned =
+//        getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+//            nodeManagers[2], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    // Known=3, blacklisted=1, ignore should be true - assign 1
+//    assigned =
+//        getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+//            nodeManagers[0], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
+//    assigned =
+//        getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+//            nodeManagers[3], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    // Test blacklisting re-enabled.
+//    // Known=4, blacklisted=1, ignore should be false - no assignment on h1
+//    assigned =
+//        getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+//            nodeManagers[0], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//    // RMContainerRequestor would have created a replacement request.
+//
+//    // Blacklist h2
+//    ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
+//    allocator.sendFailure(f2);
+//
+//    // Test ignore blacklisting re-enabled
+//    // Known=4, blacklisted=2, ignore should be true. Should assign 2
+//    // containers.
+//    assigned =
+//        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+//            nodeManagers[0], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+//
+//    // Known=4, blacklisted=2, ignore should be true.
+//    assigned =
+//        getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+//            nodeManagers[1], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//
+//    // Test blacklist while ignore blacklisting enabled
+//    ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
+//    allocator.sendFailure(f3);
+//
+//    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//    // Known=5, blacklisted=3, ignore should be true.
+//    assigned =
+//        getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+//            nodeManagers[2], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//    
+//    // Assign on 5 more nodes - to re-enable blacklisting
+//    for (int i = 0; i < 5; i++) {
+//      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+//      assigned =
+//          getContainerOnHost(jobId, 11 + i, 1024,
+//              new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+//              dispatcher, allocator);
+//      Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+//    }
+//
+//    // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
+//    assigned =
+//        getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+//            nodeManagers[2], dispatcher, allocator);
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//  }
+//
+//  private MockNM registerNodeManager(int i, MyResourceManager rm,
+//      DrainDispatcher dispatcher) throws Exception {
+//    MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
+//    dispatcher.await();
+//    return nm;
+//  }
+//
+//  private
+//      List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
+//          int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
+//          DrainDispatcher dispatcher, MyContainerAllocator allocator)
+//          throws Exception {
+//    ContainerRequestEvent reqEvent =
+//        createReq(jobId, taskAttemptId, memory, hosts);
+//    allocator.sendRequest(reqEvent);
+//
+//    // Send the request to the RM
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // Heartbeat from the required nodeManager
+//    mockNM.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    return assigned;
+//  }
+// 
+//  @Test
+//  public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
+//    LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
+//
+//    Configuration conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+//    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+//    conf.setInt(
+//        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+//    
+//    MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+//    amNodeManager.nodeHeartbeat(true);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    rm.sendAMLaunched(appAttemptId);
+//    dispatcher.await();
+//    
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job mockJob = mock(Job.class);
+//    when(mockJob.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+//        appAttemptId, mockJob);
+//
+//    // add resources to scheduler
+//    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+//    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+//    dispatcher.await();
+//
+//    LOG.info("Requesting 1 Containers _1 on H1");
+//    // create the container request
+//    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+//        new String[] { "h1" });
+//    allocator.sendRequest(event1);
+//
+//    LOG.info("RM Heartbeat (to send the container requests)");
+//    // this tells the scheduler about the requests
+//    // as nodes are not added, no allocations
+//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    LOG.info("h1 Heartbeat (To actually schedule the containers)");
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//    
+//    LOG.info("RM Heartbeat (To process the scheduled containers)");
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
+//    
+//    LOG.info("Failing container _1 on H1 (should blacklist the node)");
+//    // Send events to blacklist nodes h1 and h2
+//    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+//    allocator.sendFailure(f1);
+//
+//    //At this stage, a request should be created for a fast fail map
+//    //Create a FAST_FAIL request for a previously failed map.
+//    ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
+//        new String[] { "h1" }, true, false);
+//    allocator.sendRequest(event1f);
+//
+//    //Update the Scheduler with the new requests.
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//
+//    // send another request with different resource and priority
+//    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+//        new String[] { "h1", "h3" });
+//    allocator.sendRequest(event3);
+//    
+//    //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
+//    //RM is only aware of the prio:5 container
+//    
+//    LOG.info("h1 Heartbeat (To actually schedule the containers)");
+//    // update resources in scheduler
+//    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//    
+//    LOG.info("RM Heartbeat (To process the scheduled containers)");
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+//    
+//    //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
+//
+//    //Send a release for the p:5 container + another request.
+//    LOG.info("RM Heartbeat (To process the re-scheduled containers)");
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+//    
+//    //Hearbeat from H3 to schedule on this host.
+//    LOG.info("h3 Heartbeat (To re-schedule the containers)");
+//    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+//    dispatcher.await();
+//    
+//    LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
+//    assigned = allocator.schedule();
+//    dispatcher.await();
+//     
+//    // For debugging
+//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+//      LOG.info(assig.getTaskAttemptID() +
+//          " assgined to " + assig.getContainer().getId() +
+//          " with priority " + assig.getContainer().getPriority());
+//    }
+//    
+//    Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+//    
+//    // validate that all containers are assigned to h3
+//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+//      Assert.assertEquals("Assigned container " + assig.getContainer().getId()
+//          + " host not correct", "h3", assig.getContainer().getNodeId().getHost());
+//    }
+//  }
+//  
+//  private static class MyFifoScheduler extends FifoScheduler {
+//
+//    public MyFifoScheduler(RMContext rmContext) {
+//      super();
+//      try {
+//        Configuration conf = new Configuration();
+//        reinitialize(conf, new ContainerTokenSecretManager(conf),
+//            rmContext);
+//      } catch (IOException ie) {
+//        LOG.info("add application failed with ", ie);
+//        assert (false);
+//      }
+//    }
+//
+//    // override this to copy the objects otherwise FifoScheduler updates the
+//    // numContainers in same objects as kept by RMContainerAllocator
+//    @Override
+//    public synchronized Allocation allocate(
+//        ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+//        List<ContainerId> release) {
+//      List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+//      for (ResourceRequest req : ask) {
+//        ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req
+//            .getPriority(), req.getHostName(), req.getCapability(), req
+//            .getNumContainers());
+//        askCopy.add(reqCopy);
+//      }
+//      return super.allocate(applicationAttemptId, askCopy, release);
+//    }
+//  }
+//
+//  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+//      int memory, String[] hosts) {
+//    return createReq(jobId, taskAttemptId, memory, hosts, false, false);
+//  }
+//
+//  private ContainerRequestEvent
+//      createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
+//          boolean earlierFailedAttempt, boolean reduce) {
+//    TaskId taskId;
+//    if (reduce) {
+//      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+//    } else {
+//      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+//    }
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+//        taskAttemptId);
+//    Resource containerNeed = BuilderUtils.newResource(memory);
+//    if (earlierFailedAttempt) {
+//      return ContainerRequestEvent
+//          .createContainerRequestEventForFailedContainer(attemptId,
+//              containerNeed);
+//    }
+//    return new ContainerRequestEvent(attemptId, containerNeed, hosts,
+//        new String[] { NetworkTopology.DEFAULT_RACK });
+//  }
+//
+//  private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
+//      String host, boolean reduce) {
+//    TaskId taskId;
+//    if (reduce) {
+//      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+//    } else {
+//      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+//    }
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+//        taskAttemptId);
+//    return new ContainerFailedEvent(attemptId, host);    
+//  }
+//  
+//  private void checkAssignments(ContainerRequestEvent[] requests,
+//      List<TaskAttemptContainerAssignedEvent> assignments,
+//      boolean checkHostMatch) {
+//    Assert.assertNotNull("Container not assigned", assignments);
+//    Assert.assertEquals("Assigned count not correct", requests.length,
+//        assignments.size());
+//
+//    // check for uniqueness of containerIDs
+//    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+//    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+//      containerIds.add(assigned.getContainer().getId());
+//    }
+//    Assert.assertEquals("Assigned containers must be different", assignments
+//        .size(), containerIds.size());
+//
+//    // check for all assignment
+//    for (ContainerRequestEvent req : requests) {
+//      TaskAttemptContainerAssignedEvent assigned = null;
+//      for (TaskAttemptContainerAssignedEvent ass : assignments) {
+//        if (ass.getTaskAttemptID().equals(req.getAttemptID())) {
+//          assigned = ass;
+//          break;
+//        }
+//      }
+//      checkAssignment(req, assigned, checkHostMatch);
+//    }
+//  }
+//
+//  private void checkAssignment(ContainerRequestEvent request,
+//      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+//    Assert.assertNotNull("Nothing assigned to attempt "
+//        + request.getAttemptID(), assigned);
+//    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+//        assigned.getTaskAttemptID());
+//    if (checkHostMatch) {
+//      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
+//          request.getHosts()).contains(
+//          assigned.getContainer().getNodeId().toString()));
+//    }
+//  }
+//
+//  // Mock RMContainerAllocator
+//  // Instead of talking to remote Scheduler,uses the local Scheduler
+//  private static class MyContainerAllocator extends RMContainerAllocator {
+//    static final List<TaskAttemptContainerAssignedEvent> events
+//      = new ArrayList<TaskAttemptContainerAssignedEvent>();
+//    static final List<TaskAttemptKillEvent> taskAttemptKillEvents 
+//      = new ArrayList<TaskAttemptKillEvent>();
+//    static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents 
+//    = new ArrayList<JobUpdatedNodesEvent>();
+//    private MyResourceManager rm;
+//
+//    private static AppContext createAppContext(
+//        ApplicationAttemptId appAttemptId, Job job) {
+//      AppContext context = mock(AppContext.class);
+//      ApplicationId appId = appAttemptId.getApplicationId();
+//      when(context.getApplicationID()).thenReturn(appId);
+//      when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
+//      when(context.getJob(isA(JobId.class))).thenReturn(job);
+//      when(context.getClusterInfo()).thenReturn(
+//          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
+//              .newResource(10240)));
+//      when(context.getEventHandler()).thenReturn(new EventHandler() {
+//        @Override
+//        public void handle(Event event) {
+//          // Only capture interesting events.
+//          if (event instanceof TaskAttemptContainerAssignedEvent) {
+//            events.add((TaskAttemptContainerAssignedEvent) event);
+//          } else if (event instanceof TaskAttemptKillEvent) {
+//            taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
+//          } else if (event instanceof JobUpdatedNodesEvent) {
+//            jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
+//          }
+//        }
+//      });
+//      return context;
+//    }
+//
+//    private static ClientService createMockClientService() {
+//      ClientService service = mock(ClientService.class);
+//      when(service.getBindAddress()).thenReturn(
+//          NetUtils.createSocketAddr("localhost:4567"));
+//      when(service.getHttpPort()).thenReturn(890);
+//      return service;
+//    }
+//
+//    // Use this constructor when using a real job.
+//    MyContainerAllocator(MyResourceManager rm,
+//        ApplicationAttemptId appAttemptId, AppContext context) {
+//      super(createMockClientService(), context);
+//      this.rm = rm;
+//    }
+//
+//    // Use this constructor when you are using a mocked job.
+//    public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+//        ApplicationAttemptId appAttemptId, Job job) {
+//      super(createMockClientService(), createAppContext(appAttemptId, job));
+//      this.rm = rm;
+//      super.init(conf);
+//      super.start();
+//    }
+//
+//    @Override
+//    protected AMRMProtocol createSchedulerProxy() {
+//      return this.rm.getApplicationMasterService();
+//    }
+//
+//    @Override
+//    protected void register() {
+//      super.register();
+//    }
+//
+//    @Override
+//    protected void unregister() {
+//    }
+//
+//    @Override
+//    protected Resource getMinContainerCapability() {
+//      return BuilderUtils.newResource(1024);
+//    }
+//
+//    @Override
+//    protected Resource getMaxContainerCapability() {
+//      return BuilderUtils.newResource(10240);
+//    }
+//
+//    public void sendRequest(ContainerRequestEvent req) {
+//      sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+//    }
+//
+//    public void sendRequests(List<ContainerRequestEvent> reqs) {
+//      for (ContainerRequestEvent req : reqs) {
+//        super.handleEvent(req);
+//      }
+//    }
+//
+//    public void sendFailure(ContainerFailedEvent f) {
+//      super.handleEvent(f);
+//    }
+//    
+//    // API to be used by tests
+//    public List<TaskAttemptContainerAssignedEvent> schedule() {
+//      // run the scheduler
+//      try {
+//        super.heartbeat();
+//      } catch (Exception e) {
+//        LOG.error("error in heartbeat ", e);
+//        throw new YarnException(e);
+//      }
+//
+//      List<TaskAttemptContainerAssignedEvent> result
+//        = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+//      events.clear();
+//      return result;
+//    }
+//    
+//    List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
+//      return taskAttemptKillEvents;
+//    }
+//    
+//    List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
+//      return jobUpdatedNodeEvents;
+//    }
+//
+//    @Override
+//    protected void startAllocatorThread() {
+//      // override to NOT start thread
+//    }
+//        
+//  }
+//
+//  @Test
+//  public void testReduceScheduling() throws Exception {
+//    int totalMaps = 10;
+//    int succeededMaps = 1;
+//    int scheduledMaps = 10;
+//    int scheduledReduces = 0;
+//    int assignedMaps = 2;
+//    int assignedReduces = 0;
+//    int mapResourceReqt = 1024;
+//    int reduceResourceReqt = 2*1024;
+//    int numPendingReduces = 4;
+//    float maxReduceRampupLimit = 0.5f;
+//    float reduceSlowStart = 0.2f;
+//    
+//    RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+//    doCallRealMethod().when(allocator).
+//        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
+//            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
+//    
+//    // Test slow-start
+//    allocator.scheduleReduces(
+//        totalMaps, succeededMaps, 
+//        scheduledMaps, scheduledReduces, 
+//        assignedMaps, assignedReduces, 
+//        mapResourceReqt, reduceResourceReqt, 
+//        numPendingReduces, 
+//        maxReduceRampupLimit, reduceSlowStart);
+//    verify(allocator, never()).setIsReduceStarted(true);
+//    
+//    // verify slow-start still in effect when no more maps need to
+//    // be scheduled but some have yet to complete
+//    allocator.scheduleReduces(
+//        totalMaps, succeededMaps,
+//        0, scheduledReduces,
+//        totalMaps - succeededMaps, assignedReduces,
+//        mapResourceReqt, reduceResourceReqt,
+//        numPendingReduces,
+//        maxReduceRampupLimit, reduceSlowStart);
+//    verify(allocator, never()).setIsReduceStarted(true);
+//    verify(allocator, never()).scheduleAllReduces();
+//
+//    succeededMaps = 3;
+//    allocator.scheduleReduces(
+//        totalMaps, succeededMaps, 
+//        scheduledMaps, scheduledReduces, 
+//        assignedMaps, assignedReduces, 
+//        mapResourceReqt, reduceResourceReqt, 
+//        numPendingReduces, 
+//        maxReduceRampupLimit, reduceSlowStart);
+//    verify(allocator, times(1)).setIsReduceStarted(true);
+//    
+//    // Test reduce ramp-up
+//    doReturn(100 * 1024).when(allocator).getMemLimit();
+//    allocator.scheduleReduces(
+//        totalMaps, succeededMaps, 
+//        scheduledMaps, scheduledReduces, 
+//        assignedMaps, assignedReduces, 
+//        mapResourceReqt, reduceResourceReqt, 
+//        numPendingReduces, 
+//        maxReduceRampupLimit, reduceSlowStart);
+//    verify(allocator).rampUpReduces(anyInt());
+//    verify(allocator, never()).rampDownReduces(anyInt());
+//
+//    // Test reduce ramp-down
+//    scheduledReduces = 3;
+//    doReturn(10 * 1024).when(allocator).getMemLimit();
+//    allocator.scheduleReduces(
+//        totalMaps, succeededMaps, 
+//        scheduledMaps, scheduledReduces, 
+//        assignedMaps, assignedReduces, 
+//        mapResourceReqt, reduceResourceReqt, 
+//        numPendingReduces, 
+//        maxReduceRampupLimit, reduceSlowStart);
+//    verify(allocator).rampDownReduces(anyInt());
+//  }
+//
+//  private static class RecalculateContainerAllocator extends MyContainerAllocator {
+//    public boolean recalculatedReduceSchedule = false;
+//
+//    public RecalculateContainerAllocator(MyResourceManager rm,
+//        Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+//      super(rm, conf, appAttemptId, job);
+//    }
+//
+//    @Override
+//    public void scheduleReduces(int totalMaps, int completedMaps,
+//        int scheduledMaps, int scheduledReduces, int assignedMaps,
+//        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+//        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+//      recalculatedReduceSchedule = true;
+//    }
+//  }
+//  
+//  @Test
+//  public void testCompletedTasksRecalculateSchedule() throws Exception {
+//    LOG.info("Running testCompletedTasksRecalculateSchedule");
+//
+//    Configuration conf = new Configuration();
+//    final MyResourceManager rm = new MyResourceManager(conf);
+//    rm.start();
+//    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+//        .getDispatcher();
+//
+//    // Submit the application
+//    RMApp app = rm.submitApp(1024);
+//    dispatcher.await();
+//
+//    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+//        .getAppAttemptId();
+//    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+//    Job job = mock(Job.class);
+//    when(job.getReport()).thenReturn(
+//        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+//            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+//    doReturn(10).when(job).getTotalMaps();
+//    doReturn(10).when(job).getTotalReduces();
+//    doReturn(0).when(job).getCompletedMaps();
+//    RecalculateContainerAllocator allocator =
+//        new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+//    allocator.schedule();
+//
+//    allocator.recalculatedReduceSchedule = false;
+//    allocator.schedule();
+//    Assert.assertFalse("Unexpected recalculate of reduce schedule",
+//        allocator.recalculatedReduceSchedule);
+//
+//    doReturn(1).when(job).getCompletedMaps();
+//    allocator.schedule();
+//    Assert.assertTrue("Expected recalculate of reduce schedule",
+//        allocator.recalculatedReduceSchedule);
+//  }
+//
+//  public static void main(String[] args) throws Exception {
+//    TestRMContainerAllocator t = new TestRMContainerAllocator();
+//    t.testSimple();
+//    t.testResource();
+//    t.testMapReduceScheduling();
+//    t.testReportedAppProgress();
+//    t.testReportedAppProgressWithOnlyMaps();
+//    t.testBlackListedNodes();
+//    t.testCompletedTasksRecalculateSchedule();
+//  }
+//
+//}



Mime
View raw message