Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B349D606 for ; Wed, 22 Aug 2012 22:13:40 +0000 (UTC) Received: (qmail 98832 invoked by uid 500); 22 Aug 2012 22:13:39 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 98765 invoked by uid 500); 22 Aug 2012 22:13:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 98757 invoked by uid 99); 22 Aug 2012 22:13:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Aug 2012 22:13:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Aug 2012 22:13:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CE8B12388C52; Wed, 22 Aug 2012 22:11:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1376283 [17/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s... Date: Wed, 22 Aug 2012 22:11:48 -0000 To: mapreduce-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120822221158.CE8B12388C52@eris.apache.org> Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java?rev=1376283&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRClientService.java Wed Aug 22 22:11:39 2012 @@ -0,0 +1,215 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app2; + +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app2.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService; +import org.apache.hadoop.mapreduce.v2.app2.job.Job; +import org.apache.hadoop.mapreduce.v2.app2.job.Task; +import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.junit.Test; + +public class TestMRClientService { + + private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + @Test + public void test() throws Exception { + MRAppWithClientService app = new MRAppWithClientService(1, 0, false); + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task task = it.next(); + app.waitForState(task, TaskState.RUNNING); + TaskAttempt attempt = task.getAttempts().values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.RUNNING); + + // send the diagnostic + String diagnostic1 = "Diagnostic1"; + String diagnostic2 = "Diagnostic2"; + app.getContext().getEventHandler().handle( + new TaskAttemptDiagnosticsUpdateEvent(attempt.getID(), diagnostic1)); + + // send the status update + TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); + taskAttemptStatus.id = attempt.getID(); + taskAttemptStatus.progress = 0.5f; + taskAttemptStatus.stateString = "RUNNING"; + taskAttemptStatus.taskState = TaskAttemptState.RUNNING; + taskAttemptStatus.phase = Phase.MAP; + taskAttemptStatus.outputSize = 3; + // send the status update + app.getContext().getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus)); + + + //verify that all object are fully populated by invoking RPCs. + YarnRPC rpc = YarnRPC.create(conf); + MRClientProtocol proxy = + (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + app.clientService.getBindAddress(), conf); + GetCountersRequest gcRequest = + recordFactory.newRecordInstance(GetCountersRequest.class); + gcRequest.setJobId(job.getID()); + Assert.assertNotNull("Counters is null", + proxy.getCounters(gcRequest).getCounters()); + + GetJobReportRequest gjrRequest = + recordFactory.newRecordInstance(GetJobReportRequest.class); + gjrRequest.setJobId(job.getID()); + JobReport jr = proxy.getJobReport(gjrRequest).getJobReport(); + verifyJobReport(jr); + + + GetTaskAttemptCompletionEventsRequest gtaceRequest = + recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class); + gtaceRequest.setJobId(job.getID()); + gtaceRequest.setFromEventId(0); + gtaceRequest.setMaxEvents(10); + Assert.assertNotNull("TaskCompletionEvents is null", + proxy.getTaskAttemptCompletionEvents(gtaceRequest).getCompletionEventList()); + + GetDiagnosticsRequest gdRequest = + recordFactory.newRecordInstance(GetDiagnosticsRequest.class); + gdRequest.setTaskAttemptId(attempt.getID()); + Assert.assertNotNull("Diagnostics is null", + proxy.getDiagnostics(gdRequest).getDiagnosticsList()); + + GetTaskAttemptReportRequest gtarRequest = + recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class); + gtarRequest.setTaskAttemptId(attempt.getID()); + TaskAttemptReport tar = + proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport(); + verifyTaskAttemptReport(tar); + + + GetTaskReportRequest gtrRequest = + recordFactory.newRecordInstance(GetTaskReportRequest.class); + gtrRequest.setTaskId(task.getID()); + Assert.assertNotNull("TaskReport is null", + proxy.getTaskReport(gtrRequest).getTaskReport()); + + GetTaskReportsRequest gtreportsRequest = + recordFactory.newRecordInstance(GetTaskReportsRequest.class); + gtreportsRequest.setJobId(job.getID()); + gtreportsRequest.setTaskType(TaskType.MAP); + Assert.assertNotNull("TaskReports for map is null", + proxy.getTaskReports(gtreportsRequest).getTaskReportList()); + + gtreportsRequest = + recordFactory.newRecordInstance(GetTaskReportsRequest.class); + gtreportsRequest.setJobId(job.getID()); + gtreportsRequest.setTaskType(TaskType.REDUCE); + Assert.assertNotNull("TaskReports for reduce is null", + proxy.getTaskReports(gtreportsRequest).getTaskReportList()); + + List diag = proxy.getDiagnostics(gdRequest).getDiagnosticsList(); + Assert.assertEquals("Num diagnostics not correct", 1 , diag.size()); + Assert.assertEquals("Diag 1 not correct", + diagnostic1, diag.get(0).toString()); + + TaskReport taskReport = proxy.getTaskReport(gtrRequest).getTaskReport(); + Assert.assertEquals("Num diagnostics not correct", 1, + taskReport.getDiagnosticsCount()); + + //send the done signal to the task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + } + + private void verifyJobReport(JobReport jr) { + Assert.assertNotNull("JobReport is null", jr); + List amInfos = jr.getAMInfos(); + Assert.assertEquals(1, amInfos.size()); + Assert.assertEquals(JobState.RUNNING, jr.getJobState()); + AMInfo amInfo = amInfos.get(0); + Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); + Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId()); + Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId() + .getAttemptId()); + Assert.assertTrue(amInfo.getStartTime() > 0); + Assert.assertEquals(false, jr.isUber()); + } + + private void verifyTaskAttemptReport(TaskAttemptReport tar) { + Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState()); + Assert.assertNotNull("TaskAttemptReport is null", tar); + Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort()); + Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId() + .getAttemptId()); + } + + class MRAppWithClientService extends MRApp { + MRClientService clientService = null; + MRAppWithClientService(int maps, int reduces, boolean autoComplete) { + super(maps, reduces, autoComplete, "MRAppWithClientService", true); + } + @Override + protected ClientService createClientService(AppContext context) { + clientService = new MRClientService(context); + return clientService; + } + } + + public static void main(String[] args) throws Exception { + TestMRClientService t = new TestMRClientService(); + t.test(); + } +} Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java?rev=1376283&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRMContainerAllocator.java Wed Aug 22 22:11:39 2012 @@ -0,0 +1,1480 @@ +///** +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, software +//* distributed under the License is distributed on an "AS IS" BASIS, +//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//* See the License for the specific language governing permissions and +//* limitations under the License. +//*/ +// +//package org.apache.hadoop.mapreduce.v2.app2; +// +//import static org.mockito.Matchers.anyFloat; +//import static org.mockito.Matchers.anyInt; +//import static org.mockito.Matchers.isA; +//import static org.mockito.Mockito.doCallRealMethod; +//import static org.mockito.Mockito.doReturn; +//import static org.mockito.Mockito.mock; +//import static org.mockito.Mockito.never; +//import static org.mockito.Mockito.times; +//import static org.mockito.Mockito.verify; +//import static org.mockito.Mockito.when; +// +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.HashMap; +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Map; +//import java.util.Set; +// +//import junit.framework.Assert; +// +//import org.apache.commons.logging.Log; +//import org.apache.commons.logging.LogFactory; +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.mapreduce.MRJobConfig; +//import org.apache.hadoop.mapreduce.v2.api.records.JobId; +//import org.apache.hadoop.mapreduce.v2.api.records.JobState; +//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +//import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +//import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +//import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +//import org.apache.hadoop.mapreduce.v2.app2.client.ClientService; +//import org.apache.hadoop.mapreduce.v2.app2.job.Job; +//import org.apache.hadoop.mapreduce.v2.app2.job.Task; +//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt; +//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobUpdatedNodesEvent; +//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent; +//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent; +//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType; +//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptKillEvent; +//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator; +//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerFailedEvent; +//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent; +//import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator; +//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +//import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +//import org.apache.hadoop.net.NetUtils; +//import org.apache.hadoop.net.NetworkTopology; +//import org.apache.hadoop.yarn.ClusterInfo; +//import org.apache.hadoop.yarn.YarnException; +//import org.apache.hadoop.yarn.api.AMRMProtocol; +//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +//import org.apache.hadoop.yarn.api.records.ApplicationId; +//import org.apache.hadoop.yarn.api.records.ContainerId; +//import org.apache.hadoop.yarn.api.records.ContainerState; +//import org.apache.hadoop.yarn.api.records.ContainerStatus; +//import org.apache.hadoop.yarn.api.records.Resource; +//import org.apache.hadoop.yarn.api.records.ResourceRequest; +//import org.apache.hadoop.yarn.event.Dispatcher; +//import org.apache.hadoop.yarn.event.DrainDispatcher; +//import org.apache.hadoop.yarn.event.Event; +//import org.apache.hadoop.yarn.event.EventHandler; +//import org.apache.hadoop.yarn.factories.RecordFactory; +//import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +//import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +//import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +//import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +//import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +//import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +//import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; +//import org.apache.hadoop.yarn.util.BuilderUtils; +//import org.junit.After; +//import org.junit.Test; +// +//@SuppressWarnings("unchecked") +//public class TestRMContainerAllocator { +// +// static final Log LOG = LogFactory +// .getLog(TestRMContainerAllocator.class); +// static final RecordFactory recordFactory = RecordFactoryProvider +// .getRecordFactory(null); +// +// @After +// public void tearDown() { +// DefaultMetricsSystem.shutdown(); +// } +// +// @Test +// public void testSimple() throws Exception { +// +// LOG.info("Running testSimple"); +// +// Configuration conf = new Configuration(); +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); +// MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); +// MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); +// dispatcher.await(); +// +// // create the container request +// ContainerRequestEvent event1 = createReq(jobId, 1, 1024, +// new String[] { "h1" }); +// allocator.sendRequest(event1); +// +// // send 1 more request with different resource req +// ContainerRequestEvent event2 = createReq(jobId, 2, 1024, +// new String[] { "h2" }); +// allocator.sendRequest(event2); +// +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // send another request with different resource and priority +// ContainerRequestEvent event3 = createReq(jobId, 3, 1024, +// new String[] { "h3" }); +// allocator.sendRequest(event3); +// +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// nodeManager2.nodeHeartbeat(true); // Node heartbeat +// nodeManager3.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, +// assigned, false); +// } +// +// @Test +// public void testResource() throws Exception { +// +// LOG.info("Running testResource"); +// +// Configuration conf = new Configuration(); +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); +// MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); +// MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); +// dispatcher.await(); +// +// // create the container request +// ContainerRequestEvent event1 = createReq(jobId, 1, 1024, +// new String[] { "h1" }); +// allocator.sendRequest(event1); +// +// // send 1 more request with different resource req +// ContainerRequestEvent event2 = createReq(jobId, 2, 2048, +// new String[] { "h2" }); +// allocator.sendRequest(event2); +// +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// nodeManager2.nodeHeartbeat(true); // Node heartbeat +// nodeManager3.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// checkAssignments(new ContainerRequestEvent[] { event1, event2 }, +// assigned, false); +// } +// +// @Test +// public void testMapReduceScheduling() throws Exception { +// +// LOG.info("Running testMapReduceScheduling"); +// +// Configuration conf = new Configuration(); +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); +// MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); +// MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); +// dispatcher.await(); +// +// // create the container request +// // send MAP request +// ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { +// "h1", "h2" }, true, false); +// allocator.sendRequest(event1); +// +// // send REDUCE request +// ContainerRequestEvent event2 = createReq(jobId, 2, 3000, +// new String[] { "h1" }, false, true); +// allocator.sendRequest(event2); +// +// // send MAP request +// ContainerRequestEvent event3 = createReq(jobId, 3, 2048, +// new String[] { "h3" }, false, false); +// allocator.sendRequest(event3); +// +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// nodeManager2.nodeHeartbeat(true); // Node heartbeat +// nodeManager3.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// checkAssignments(new ContainerRequestEvent[] { event1, event3 }, +// assigned, false); +// +// // validate that no container is assigned to h1 as it doesn't have 2048 +// for (TaskAttemptContainerAssignedEvent assig : assigned) { +// Assert.assertFalse("Assigned count not correct", "h1".equals(assig +// .getContainer().getNodeId().getHost())); +// } +// } +// +// private static class MyResourceManager extends MockRM { +// +// public MyResourceManager(Configuration conf) { +// super(conf); +// } +// +// @Override +// protected Dispatcher createDispatcher() { +// return new DrainDispatcher(); +// } +// +// @Override +// protected EventHandler createSchedulerEventDispatcher() { +// // Dispatch inline for test sanity +// return new EventHandler() { +// @Override +// public void handle(SchedulerEvent event) { +// scheduler.handle(event); +// } +// }; +// } +// @Override +// protected ResourceScheduler createScheduler() { +// return new MyFifoScheduler(this.getRMContext()); +// } +// } +// +// @Test +// public void testReportedAppProgress() throws Exception { +// +// LOG.info("Running testReportedAppProgress"); +// +// Configuration conf = new Configuration(); +// final MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp rmApp = rm.submitApp(1024); +// rmDispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); +// amNodeManager.nodeHeartbeat(true); +// rmDispatcher.await(); +// +// final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// rmDispatcher.await(); +// +// MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( +// appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { +// @Override +// protected Dispatcher createDispatcher() { +// return new DrainDispatcher(); +// } +// protected ContainerAllocator createContainerAllocator( +// ClientService clientService, AppContext context) { +// return new MyContainerAllocator(rm, appAttemptId, context); +// }; +// }; +// +// Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); +// +// mrApp.submit(conf); +// Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() +// .getValue(); +// +// DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); +// +// MyContainerAllocator allocator = (MyContainerAllocator) mrApp +// .getContainerAllocator(); +// +// mrApp.waitForState(job, JobState.RUNNING); +// +// amDispatcher.await(); +// // Wait till all map-attempts request for containers +// for (Task t : job.getTasks().values()) { +// if (t.getType() == TaskType.MAP) { +// mrApp.waitForState(t.getAttempts().values().iterator().next(), +// TaskAttemptState.UNASSIGNED); +// } +// } +// amDispatcher.await(); +// +// allocator.schedule(); +// rmDispatcher.await(); +// amNodeManager.nodeHeartbeat(true); +// rmDispatcher.await(); +// allocator.schedule(); +// rmDispatcher.await(); +// +// // Wait for all map-tasks to be running +// for (Task t : job.getTasks().values()) { +// if (t.getType() == TaskType.MAP) { +// mrApp.waitForState(t, TaskState.RUNNING); +// } +// } +// +// allocator.schedule(); // Send heartbeat +// rmDispatcher.await(); +// Assert.assertEquals(0.05f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); +// +// // Finish off 1 map. +// Iterator it = job.getTasks().values().iterator(); +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.095f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); +// +// // Finish off 7 more so that map-progress is 80% +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.41f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); +// +// // Finish off the 2 remaining maps +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); +// +// allocator.schedule(); +// rmDispatcher.await(); +// amNodeManager.nodeHeartbeat(true); +// rmDispatcher.await(); +// allocator.schedule(); +// rmDispatcher.await(); +// +// // Wait for all reduce-tasks to be running +// for (Task t : job.getTasks().values()) { +// if (t.getType() == TaskType.REDUCE) { +// mrApp.waitForState(t, TaskState.RUNNING); +// } +// } +// +// // Finish off 2 reduces +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); +// +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.59f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); +// +// // Finish off the remaining 8 reduces. +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); +// allocator.schedule(); +// rmDispatcher.await(); +// // Remaining is JobCleanup +// Assert.assertEquals(0.95f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); +// } +// +// private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node, +// MRApp mrApp, Iterator it, int nextN) throws Exception { +// Task task; +// for (int i=0; i contStatus = new ArrayList(1); +// contStatus.add(BuilderUtils.newContainerStatus(attempt.getAssignedContainerID(), +// ContainerState.COMPLETE, "", 0)); +// Map> statusUpdate = +// new HashMap>(1); +// statusUpdate.put(mrApp.getAppID(), contStatus); +// node.nodeHeartbeat(statusUpdate, true); +// rmDispatcher.await(); +// mrApp.getContext().getEventHandler().handle( +// new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); +// mrApp.waitForState(task, TaskState.SUCCEEDED); +// } +// +// @Test +// public void testReportedAppProgressWithOnlyMaps() throws Exception { +// +// LOG.info("Running testReportedAppProgressWithOnlyMaps"); +// +// Configuration conf = new Configuration(); +// final MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp rmApp = rm.submitApp(1024); +// rmDispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 11264); +// amNodeManager.nodeHeartbeat(true); +// rmDispatcher.await(); +// +// final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// rmDispatcher.await(); +// +// MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( +// appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { +// @Override +// protected Dispatcher createDispatcher() { +// return new DrainDispatcher(); +// } +// protected ContainerAllocator createContainerAllocator( +// ClientService clientService, AppContext context) { +// return new MyContainerAllocator(rm, appAttemptId, context); +// }; +// }; +// +// Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); +// +// mrApp.submit(conf); +// Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() +// .getValue(); +// +// DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); +// +// MyContainerAllocator allocator = (MyContainerAllocator) mrApp +// .getContainerAllocator(); +// +// mrApp.waitForState(job, JobState.RUNNING); +// +// amDispatcher.await(); +// // Wait till all map-attempts request for containers +// for (Task t : job.getTasks().values()) { +// mrApp.waitForState(t.getAttempts().values().iterator().next(), +// TaskAttemptState.UNASSIGNED); +// } +// amDispatcher.await(); +// +// allocator.schedule(); +// rmDispatcher.await(); +// amNodeManager.nodeHeartbeat(true); +// rmDispatcher.await(); +// allocator.schedule(); +// rmDispatcher.await(); +// +// // Wait for all map-tasks to be running +// for (Task t : job.getTasks().values()) { +// mrApp.waitForState(t, TaskState.RUNNING); +// } +// +// allocator.schedule(); // Send heartbeat +// rmDispatcher.await(); +// Assert.assertEquals(0.05f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); +// +// Iterator it = job.getTasks().values().iterator(); +// +// // Finish off 1 map so that map-progress is 10% +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.14f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); +// +// // Finish off 5 more map so that map-progress is 60% +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.59f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); +// +// // Finish off remaining map so that map-progress is 100% +// finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); +// allocator.schedule(); +// rmDispatcher.await(); +// Assert.assertEquals(0.95f, job.getProgress(), 0.001f); +// Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); +// } +// +// @Test +// public void testUpdatedNodes() throws Exception { +// Configuration conf = new Configuration(); +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nm1 = rm.registerNode("h1:1234", 10240); +// MockNM nm2 = rm.registerNode("h2:1234", 10240); +// dispatcher.await(); +// +// // create the map container request +// ContainerRequestEvent event = createReq(jobId, 1, 1024, +// new String[] { "h1" }); +// allocator.sendRequest(event); +// TaskAttemptId attemptId = event.getAttemptID(); +// +// TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); +// when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); +// Task mockTask = mock(Task.class); +// when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt); +// when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask); +// +// // this tells the scheduler about the requests +// List assigned = allocator.schedule(); +// dispatcher.await(); +// +// nm1.nodeHeartbeat(true); +// dispatcher.await(); +// // get the assignment +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals(1, assigned.size()); +// Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId()); +// // no updated nodes reported +// Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); +// Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); +// +// // mark nodes bad +// nm1.nodeHeartbeat(false); +// nm2.nodeHeartbeat(false); +// dispatcher.await(); +// +// // schedule response returns updated nodes +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals(0, assigned.size()); +// // updated nodes are reported +// Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); +// Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); +// Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); +// Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); +// allocator.getJobUpdatedNodeEvents().clear(); +// allocator.getTaskAttemptKillEvents().clear(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals(0, assigned.size()); +// // no updated nodes reported +// Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); +// Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); +// } +// +// @Test +// public void testBlackListedNodes() throws Exception { +// +// LOG.info("Running testBlackListedNodes"); +// +// Configuration conf = new Configuration(); +// conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); +// conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); +// conf.setInt( +// MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); +// +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); +// MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); +// MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); +// dispatcher.await(); +// +// // create the container request +// ContainerRequestEvent event1 = createReq(jobId, 1, 1024, +// new String[] { "h1" }); +// allocator.sendRequest(event1); +// +// // send 1 more request with different resource req +// ContainerRequestEvent event2 = createReq(jobId, 2, 1024, +// new String[] { "h2" }); +// allocator.sendRequest(event2); +// +// // send another request with different resource and priority +// ContainerRequestEvent event3 = createReq(jobId, 3, 1024, +// new String[] { "h3" }); +// allocator.sendRequest(event3); +// +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // Send events to blacklist nodes h1 and h2 +// ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); +// allocator.sendFailure(f1); +// ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); +// allocator.sendFailure(f2); +// +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// nodeManager2.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // mark h1/h2 as bad nodes +// nodeManager1.nodeHeartbeat(false); +// nodeManager2.nodeHeartbeat(false); +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// nodeManager3.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// assigned = allocator.schedule(); +// dispatcher.await(); +// +// Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); +// +// // validate that all containers are assigned to h3 +// for (TaskAttemptContainerAssignedEvent assig : assigned) { +// Assert.assertTrue("Assigned container host not correct", "h3".equals(assig +// .getContainer().getNodeId().getHost())); +// } +// } +// +// @Test +// public void testIgnoreBlacklisting() throws Exception { +// LOG.info("Running testIgnoreBlacklisting"); +// +// Configuration conf = new Configuration(); +// conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); +// conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); +// conf.setInt( +// MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33); +// +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = +// (DrainDispatcher) rm.getRMContext().getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM[] nodeManagers = new MockNM[10]; +// int nmNum = 0; +// List assigned = null; +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// nodeManagers[0].nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = +// app.getCurrentAppAttempt().getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = +// new MyContainerAllocator(rm, conf, appAttemptId, mockJob); +// +// // Known=1, blacklisted=0, ignore should be false - assign first container +// assigned = +// getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, +// nodeManagers[0], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// LOG.info("Failing container _1 on H1 (Node should be blacklisted and" +// + " ignore blacklisting enabled"); +// // Send events to blacklist nodes h1 and h2 +// ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); +// allocator.sendFailure(f1); +// +// // Test single node. +// // Known=1, blacklisted=1, ignore should be true - assign 1 +// assigned = +// getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, +// nodeManagers[0], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. +// assigned = +// getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, +// nodeManagers[1], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. +// assigned = +// getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, +// nodeManagers[2], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// // Known=3, blacklisted=1, ignore should be true - assign 1 +// assigned = +// getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, +// nodeManagers[0], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// // Known=4, blacklisted=1, ignore should be false - assign 1 anyway +// assigned = +// getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, +// nodeManagers[3], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// // Test blacklisting re-enabled. +// // Known=4, blacklisted=1, ignore should be false - no assignment on h1 +// assigned = +// getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, +// nodeManagers[0], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// // RMContainerRequestor would have created a replacement request. +// +// // Blacklist h2 +// ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false); +// allocator.sendFailure(f2); +// +// // Test ignore blacklisting re-enabled +// // Known=4, blacklisted=2, ignore should be true. Should assign 2 +// // containers. +// assigned = +// getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, +// nodeManagers[0], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); +// +// // Known=4, blacklisted=2, ignore should be true. +// assigned = +// getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, +// nodeManagers[1], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// // Test blacklist while ignore blacklisting enabled +// ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false); +// allocator.sendFailure(f3); +// +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// // Known=5, blacklisted=3, ignore should be true. +// assigned = +// getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, +// nodeManagers[2], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// // Assign on 5 more nodes - to re-enable blacklisting +// for (int i = 0; i < 5; i++) { +// nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); +// assigned = +// getContainerOnHost(jobId, 11 + i, 1024, +// new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], +// dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// } +// +// // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. +// assigned = +// getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, +// nodeManagers[2], dispatcher, allocator); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// } +// +// private MockNM registerNodeManager(int i, MyResourceManager rm, +// DrainDispatcher dispatcher) throws Exception { +// MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240); +// dispatcher.await(); +// return nm; +// } +// +// private +// List getContainerOnHost(JobId jobId, +// int taskAttemptId, int memory, String[] hosts, MockNM mockNM, +// DrainDispatcher dispatcher, MyContainerAllocator allocator) +// throws Exception { +// ContainerRequestEvent reqEvent = +// createReq(jobId, taskAttemptId, memory, hosts); +// allocator.sendRequest(reqEvent); +// +// // Send the request to the RM +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // Heartbeat from the required nodeManager +// mockNM.nodeHeartbeat(true); +// dispatcher.await(); +// +// assigned = allocator.schedule(); +// dispatcher.await(); +// return assigned; +// } +// +// @Test +// public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { +// LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); +// +// Configuration conf = new Configuration(); +// conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); +// conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); +// conf.setInt( +// MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); +// +// MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); +// amNodeManager.nodeHeartbeat(true); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// rm.sendAMLaunched(appAttemptId); +// dispatcher.await(); +// +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job mockJob = mock(Job.class); +// when(mockJob.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, +// appAttemptId, mockJob); +// +// // add resources to scheduler +// MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); +// MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); +// dispatcher.await(); +// +// LOG.info("Requesting 1 Containers _1 on H1"); +// // create the container request +// ContainerRequestEvent event1 = createReq(jobId, 1, 1024, +// new String[] { "h1" }); +// allocator.sendRequest(event1); +// +// LOG.info("RM Heartbeat (to send the container requests)"); +// // this tells the scheduler about the requests +// // as nodes are not added, no allocations +// List assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// LOG.info("h1 Heartbeat (To actually schedule the containers)"); +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// LOG.info("RM Heartbeat (To process the scheduled containers)"); +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); +// +// LOG.info("Failing container _1 on H1 (should blacklist the node)"); +// // Send events to blacklist nodes h1 and h2 +// ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); +// allocator.sendFailure(f1); +// +// //At this stage, a request should be created for a fast fail map +// //Create a FAST_FAIL request for a previously failed map. +// ContainerRequestEvent event1f = createReq(jobId, 1, 1024, +// new String[] { "h1" }, true, false); +// allocator.sendRequest(event1f); +// +// //Update the Scheduler with the new requests. +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// // send another request with different resource and priority +// ContainerRequestEvent event3 = createReq(jobId, 3, 1024, +// new String[] { "h1", "h3" }); +// allocator.sendRequest(event3); +// +// //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. +// //RM is only aware of the prio:5 container +// +// LOG.info("h1 Heartbeat (To actually schedule the containers)"); +// // update resources in scheduler +// nodeManager1.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// LOG.info("RM Heartbeat (To process the scheduled containers)"); +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// //RMContainerAllocator gets assigned a p:5 on a blacklisted node. +// +// //Send a release for the p:5 container + another request. +// LOG.info("RM Heartbeat (To process the re-scheduled containers)"); +// assigned = allocator.schedule(); +// dispatcher.await(); +// Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); +// +// //Hearbeat from H3 to schedule on this host. +// LOG.info("h3 Heartbeat (To re-schedule the containers)"); +// nodeManager3.nodeHeartbeat(true); // Node heartbeat +// dispatcher.await(); +// +// LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); +// assigned = allocator.schedule(); +// dispatcher.await(); +// +// // For debugging +// for (TaskAttemptContainerAssignedEvent assig : assigned) { +// LOG.info(assig.getTaskAttemptID() + +// " assgined to " + assig.getContainer().getId() + +// " with priority " + assig.getContainer().getPriority()); +// } +// +// Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); +// +// // validate that all containers are assigned to h3 +// for (TaskAttemptContainerAssignedEvent assig : assigned) { +// Assert.assertEquals("Assigned container " + assig.getContainer().getId() +// + " host not correct", "h3", assig.getContainer().getNodeId().getHost()); +// } +// } +// +// private static class MyFifoScheduler extends FifoScheduler { +// +// public MyFifoScheduler(RMContext rmContext) { +// super(); +// try { +// Configuration conf = new Configuration(); +// reinitialize(conf, new ContainerTokenSecretManager(conf), +// rmContext); +// } catch (IOException ie) { +// LOG.info("add application failed with ", ie); +// assert (false); +// } +// } +// +// // override this to copy the objects otherwise FifoScheduler updates the +// // numContainers in same objects as kept by RMContainerAllocator +// @Override +// public synchronized Allocation allocate( +// ApplicationAttemptId applicationAttemptId, List ask, +// List release) { +// List askCopy = new ArrayList(); +// for (ResourceRequest req : ask) { +// ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req +// .getPriority(), req.getHostName(), req.getCapability(), req +// .getNumContainers()); +// askCopy.add(reqCopy); +// } +// return super.allocate(applicationAttemptId, askCopy, release); +// } +// } +// +// private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, +// int memory, String[] hosts) { +// return createReq(jobId, taskAttemptId, memory, hosts, false, false); +// } +// +// private ContainerRequestEvent +// createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, +// boolean earlierFailedAttempt, boolean reduce) { +// TaskId taskId; +// if (reduce) { +// taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); +// } else { +// taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); +// } +// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, +// taskAttemptId); +// Resource containerNeed = BuilderUtils.newResource(memory); +// if (earlierFailedAttempt) { +// return ContainerRequestEvent +// .createContainerRequestEventForFailedContainer(attemptId, +// containerNeed); +// } +// return new ContainerRequestEvent(attemptId, containerNeed, hosts, +// new String[] { NetworkTopology.DEFAULT_RACK }); +// } +// +// private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, +// String host, boolean reduce) { +// TaskId taskId; +// if (reduce) { +// taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); +// } else { +// taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); +// } +// TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, +// taskAttemptId); +// return new ContainerFailedEvent(attemptId, host); +// } +// +// private void checkAssignments(ContainerRequestEvent[] requests, +// List assignments, +// boolean checkHostMatch) { +// Assert.assertNotNull("Container not assigned", assignments); +// Assert.assertEquals("Assigned count not correct", requests.length, +// assignments.size()); +// +// // check for uniqueness of containerIDs +// Set containerIds = new HashSet(); +// for (TaskAttemptContainerAssignedEvent assigned : assignments) { +// containerIds.add(assigned.getContainer().getId()); +// } +// Assert.assertEquals("Assigned containers must be different", assignments +// .size(), containerIds.size()); +// +// // check for all assignment +// for (ContainerRequestEvent req : requests) { +// TaskAttemptContainerAssignedEvent assigned = null; +// for (TaskAttemptContainerAssignedEvent ass : assignments) { +// if (ass.getTaskAttemptID().equals(req.getAttemptID())) { +// assigned = ass; +// break; +// } +// } +// checkAssignment(req, assigned, checkHostMatch); +// } +// } +// +// private void checkAssignment(ContainerRequestEvent request, +// TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { +// Assert.assertNotNull("Nothing assigned to attempt " +// + request.getAttemptID(), assigned); +// Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), +// assigned.getTaskAttemptID()); +// if (checkHostMatch) { +// Assert.assertTrue("Not assigned to requested host", Arrays.asList( +// request.getHosts()).contains( +// assigned.getContainer().getNodeId().toString())); +// } +// } +// +// // Mock RMContainerAllocator +// // Instead of talking to remote Scheduler,uses the local Scheduler +// private static class MyContainerAllocator extends RMContainerAllocator { +// static final List events +// = new ArrayList(); +// static final List taskAttemptKillEvents +// = new ArrayList(); +// static final List jobUpdatedNodeEvents +// = new ArrayList(); +// private MyResourceManager rm; +// +// private static AppContext createAppContext( +// ApplicationAttemptId appAttemptId, Job job) { +// AppContext context = mock(AppContext.class); +// ApplicationId appId = appAttemptId.getApplicationId(); +// when(context.getApplicationID()).thenReturn(appId); +// when(context.getApplicationAttemptId()).thenReturn(appAttemptId); +// when(context.getJob(isA(JobId.class))).thenReturn(job); +// when(context.getClusterInfo()).thenReturn( +// new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils +// .newResource(10240))); +// when(context.getEventHandler()).thenReturn(new EventHandler() { +// @Override +// public void handle(Event event) { +// // Only capture interesting events. +// if (event instanceof TaskAttemptContainerAssignedEvent) { +// events.add((TaskAttemptContainerAssignedEvent) event); +// } else if (event instanceof TaskAttemptKillEvent) { +// taskAttemptKillEvents.add((TaskAttemptKillEvent)event); +// } else if (event instanceof JobUpdatedNodesEvent) { +// jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); +// } +// } +// }); +// return context; +// } +// +// private static ClientService createMockClientService() { +// ClientService service = mock(ClientService.class); +// when(service.getBindAddress()).thenReturn( +// NetUtils.createSocketAddr("localhost:4567")); +// when(service.getHttpPort()).thenReturn(890); +// return service; +// } +// +// // Use this constructor when using a real job. +// MyContainerAllocator(MyResourceManager rm, +// ApplicationAttemptId appAttemptId, AppContext context) { +// super(createMockClientService(), context); +// this.rm = rm; +// } +// +// // Use this constructor when you are using a mocked job. +// public MyContainerAllocator(MyResourceManager rm, Configuration conf, +// ApplicationAttemptId appAttemptId, Job job) { +// super(createMockClientService(), createAppContext(appAttemptId, job)); +// this.rm = rm; +// super.init(conf); +// super.start(); +// } +// +// @Override +// protected AMRMProtocol createSchedulerProxy() { +// return this.rm.getApplicationMasterService(); +// } +// +// @Override +// protected void register() { +// super.register(); +// } +// +// @Override +// protected void unregister() { +// } +// +// @Override +// protected Resource getMinContainerCapability() { +// return BuilderUtils.newResource(1024); +// } +// +// @Override +// protected Resource getMaxContainerCapability() { +// return BuilderUtils.newResource(10240); +// } +// +// public void sendRequest(ContainerRequestEvent req) { +// sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); +// } +// +// public void sendRequests(List reqs) { +// for (ContainerRequestEvent req : reqs) { +// super.handleEvent(req); +// } +// } +// +// public void sendFailure(ContainerFailedEvent f) { +// super.handleEvent(f); +// } +// +// // API to be used by tests +// public List schedule() { +// // run the scheduler +// try { +// super.heartbeat(); +// } catch (Exception e) { +// LOG.error("error in heartbeat ", e); +// throw new YarnException(e); +// } +// +// List result +// = new ArrayList(events); +// events.clear(); +// return result; +// } +// +// List getTaskAttemptKillEvents() { +// return taskAttemptKillEvents; +// } +// +// List getJobUpdatedNodeEvents() { +// return jobUpdatedNodeEvents; +// } +// +// @Override +// protected void startAllocatorThread() { +// // override to NOT start thread +// } +// +// } +// +// @Test +// public void testReduceScheduling() throws Exception { +// int totalMaps = 10; +// int succeededMaps = 1; +// int scheduledMaps = 10; +// int scheduledReduces = 0; +// int assignedMaps = 2; +// int assignedReduces = 0; +// int mapResourceReqt = 1024; +// int reduceResourceReqt = 2*1024; +// int numPendingReduces = 4; +// float maxReduceRampupLimit = 0.5f; +// float reduceSlowStart = 0.2f; +// +// RMContainerAllocator allocator = mock(RMContainerAllocator.class); +// doCallRealMethod().when(allocator). +// scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), +// anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat()); +// +// // Test slow-start +// allocator.scheduleReduces( +// totalMaps, succeededMaps, +// scheduledMaps, scheduledReduces, +// assignedMaps, assignedReduces, +// mapResourceReqt, reduceResourceReqt, +// numPendingReduces, +// maxReduceRampupLimit, reduceSlowStart); +// verify(allocator, never()).setIsReduceStarted(true); +// +// // verify slow-start still in effect when no more maps need to +// // be scheduled but some have yet to complete +// allocator.scheduleReduces( +// totalMaps, succeededMaps, +// 0, scheduledReduces, +// totalMaps - succeededMaps, assignedReduces, +// mapResourceReqt, reduceResourceReqt, +// numPendingReduces, +// maxReduceRampupLimit, reduceSlowStart); +// verify(allocator, never()).setIsReduceStarted(true); +// verify(allocator, never()).scheduleAllReduces(); +// +// succeededMaps = 3; +// allocator.scheduleReduces( +// totalMaps, succeededMaps, +// scheduledMaps, scheduledReduces, +// assignedMaps, assignedReduces, +// mapResourceReqt, reduceResourceReqt, +// numPendingReduces, +// maxReduceRampupLimit, reduceSlowStart); +// verify(allocator, times(1)).setIsReduceStarted(true); +// +// // Test reduce ramp-up +// doReturn(100 * 1024).when(allocator).getMemLimit(); +// allocator.scheduleReduces( +// totalMaps, succeededMaps, +// scheduledMaps, scheduledReduces, +// assignedMaps, assignedReduces, +// mapResourceReqt, reduceResourceReqt, +// numPendingReduces, +// maxReduceRampupLimit, reduceSlowStart); +// verify(allocator).rampUpReduces(anyInt()); +// verify(allocator, never()).rampDownReduces(anyInt()); +// +// // Test reduce ramp-down +// scheduledReduces = 3; +// doReturn(10 * 1024).when(allocator).getMemLimit(); +// allocator.scheduleReduces( +// totalMaps, succeededMaps, +// scheduledMaps, scheduledReduces, +// assignedMaps, assignedReduces, +// mapResourceReqt, reduceResourceReqt, +// numPendingReduces, +// maxReduceRampupLimit, reduceSlowStart); +// verify(allocator).rampDownReduces(anyInt()); +// } +// +// private static class RecalculateContainerAllocator extends MyContainerAllocator { +// public boolean recalculatedReduceSchedule = false; +// +// public RecalculateContainerAllocator(MyResourceManager rm, +// Configuration conf, ApplicationAttemptId appAttemptId, Job job) { +// super(rm, conf, appAttemptId, job); +// } +// +// @Override +// public void scheduleReduces(int totalMaps, int completedMaps, +// int scheduledMaps, int scheduledReduces, int assignedMaps, +// int assignedReduces, int mapResourceReqt, int reduceResourceReqt, +// int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) { +// recalculatedReduceSchedule = true; +// } +// } +// +// @Test +// public void testCompletedTasksRecalculateSchedule() throws Exception { +// LOG.info("Running testCompletedTasksRecalculateSchedule"); +// +// Configuration conf = new Configuration(); +// final MyResourceManager rm = new MyResourceManager(conf); +// rm.start(); +// DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() +// .getDispatcher(); +// +// // Submit the application +// RMApp app = rm.submitApp(1024); +// dispatcher.await(); +// +// ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() +// .getAppAttemptId(); +// JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); +// Job job = mock(Job.class); +// when(job.getReport()).thenReturn( +// MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, +// 0, 0, 0, 0, 0, 0, "jobfile", null, false)); +// doReturn(10).when(job).getTotalMaps(); +// doReturn(10).when(job).getTotalReduces(); +// doReturn(0).when(job).getCompletedMaps(); +// RecalculateContainerAllocator allocator = +// new RecalculateContainerAllocator(rm, conf, appAttemptId, job); +// allocator.schedule(); +// +// allocator.recalculatedReduceSchedule = false; +// allocator.schedule(); +// Assert.assertFalse("Unexpected recalculate of reduce schedule", +// allocator.recalculatedReduceSchedule); +// +// doReturn(1).when(job).getCompletedMaps(); +// allocator.schedule(); +// Assert.assertTrue("Expected recalculate of reduce schedule", +// allocator.recalculatedReduceSchedule); +// } +// +// public static void main(String[] args) throws Exception { +// TestRMContainerAllocator t = new TestRMContainerAllocator(); +// t.testSimple(); +// t.testResource(); +// t.testMapReduceScheduling(); +// t.testReportedAppProgress(); +// t.testReportedAppProgressWithOnlyMaps(); +// t.testBlackListedNodes(); +// t.testCompletedTasksRecalculateSchedule(); +// } +// +//}