tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [35/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.launcher;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestContainerLauncherImpl {
+  static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
+  private static final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  
+  private static class ContainerLauncherImplUnderTest extends 
+    ContainerLauncherImpl {
+
+    private YarnRPC rpc;
+    
+    public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
+      super(context);
+      this.rpc = rpc;
+    }
+    
+    @Override
+    protected YarnRPC createYarnRPC(Configuration conf) {
+      return rpc;
+    }
+    
+    public void waitForPoolToIdle() throws InterruptedException {
+      //I wish that we did not need the sleep, but it is here so that we are sure
+      // That the other thread had time to insert the event into the queue and
+      // start processing it.  For some reason we were getting interrupted
+      // exceptions within eventQueue without this sleep.
+      Thread.sleep(100l);
+      LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+          " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+          " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+      while(!this.eventQueue.isEmpty() || 
+          !this.launcherPool.getQueue().isEmpty() || 
+          this.launcherPool.getActiveCount() > 0) {
+        Thread.sleep(100l);
+        LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+            " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+            " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+      }
+      LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+          " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+          " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+    }
+  }
+  
+  public static ContainerId makeContainerId(long ts, int appId, int attemptId,
+      int id) {
+    return BuilderUtils.newContainerId(
+      BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(ts, appId), attemptId), id);
+  }
+
+  public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId, 
+      TaskType taskType, int id) {
+    ApplicationId aID = BuilderUtils.newApplicationId(ts, appId);
+    JobId jID = MRBuilderUtils.newJobId(aID, id);
+    TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType);
+    return MRBuilderUtils.newTaskAttemptId(tID, id);
+  }
+  
+  @Test
+  public void testHandle() throws Exception {
+    LOG.info("STARTING testHandle");
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+      
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM).startContainer(any(StartContainerRequest.class));
+      
+      LOG.info("inserting cleanup event");
+      NMCommunicatorStopRequestEvent mockCleanupEvent = mock(NMCommunicatorStopRequestEvent.class);
+      when(mockCleanupEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+      when(mockCleanupEvent.getContainerId()).thenReturn(contId);
+      when(mockCleanupEvent.getNodeId()).thenReturn(nodeId);
+      ut.handle(mockCleanupEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+    } finally {
+      ut.stop();
+    }
+  }
+  
+  @Test
+  public void testOutOfOrder() throws Exception {
+    LOG.info("STARTING testOutOfOrder");
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+
+      LOG.info("inserting cleanup event");
+      NMCommunicatorStopRequestEvent mockCleanupEvent = mock(NMCommunicatorStopRequestEvent.class);
+      when(mockCleanupEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+      when(mockCleanupEvent.getContainerId()).thenReturn(contId);
+      when(mockCleanupEvent.getNodeId()).thenReturn(nodeId);
+      ut.handle(mockCleanupEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
+    } finally {
+      ut.stop();
+    }
+  }
+
+  @Test
+  public void testMyShutdown() throws Exception {
+    LOG.info("in test Shutdown");
+
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class),
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+
+    ContainerLauncherImplUnderTest ut =
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp =
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeMetaData(80));
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+
+      ut.waitForPoolToIdle();
+
+      verify(mockCM).startContainer(any(StartContainerRequest.class));
+
+      // skip cleanup and make sure stop kills the container
+
+    } finally {
+      ut.stop();
+      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+}
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/metrics/TestMRAppMetrics.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/metrics/TestMRAppMetrics.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/metrics/TestMRAppMetrics.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/metrics/TestMRAppMetrics.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.metrics;
+
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestMRAppMetrics {
+
+  @Test public void testNames() {
+    Job job = mock(Job.class);
+    Task mapTask = make(stub(Task.class).returning(TaskType.MAP).
+                        from.getType());
+    Task reduceTask = make(stub(Task.class).returning(TaskType.REDUCE).
+                           from.getType());
+    MRAppMetrics metrics = MRAppMetrics.create();
+
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.endPreparingJob(job);
+    metrics.endPreparingJob(job);
+    metrics.endPreparingJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.failedTask(mapTask);
+    metrics.endWaitingTask(reduceTask);
+    metrics.endRunningTask(mapTask);
+    metrics.endRunningJob(job);
+    metrics.failedJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.killedTask(mapTask);
+    metrics.endWaitingTask(reduceTask);
+    metrics.endRunningTask(mapTask);
+    metrics.endRunningJob(job);
+    metrics.killedJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.completedTask(mapTask);
+    metrics.endRunningTask(mapTask);
+    metrics.launchedTask(reduceTask);
+    metrics.runningTask(reduceTask);
+    metrics.completedTask(reduceTask);
+    metrics.endRunningTask(reduceTask);
+    metrics.endRunningJob(job);
+    metrics.completedJob(job);
+
+    checkMetrics(/*job*/3, 1, 1, 1, 0, 0,
+                 /*map*/3, 1, 1, 1, 0, 0,
+                 /*reduce*/1, 1, 0, 0, 0, 0);
+  }
+
+  private void checkMetrics(int jobsSubmitted, int jobsCompleted,
+      int jobsFailed, int jobsKilled, int jobsPreparing, int jobsRunning,
+      int mapsLaunched, int mapsCompleted, int mapsFailed, int mapsKilled,
+      int mapsRunning, int mapsWaiting, int reducesLaunched,
+      int reducesCompleted, int reducesFailed, int reducesKilled,
+      int reducesRunning, int reducesWaiting) {
+    MetricsRecordBuilder rb = getMetrics("MRAppMetrics");
+    assertCounter("JobsSubmitted", jobsSubmitted, rb);
+    assertCounter("JobsCompleted", jobsCompleted, rb);
+    assertCounter("JobsFailed", jobsFailed, rb);
+    assertCounter("JobsKilled", jobsKilled, rb);
+    assertGauge("JobsPreparing", jobsPreparing, rb);
+    assertGauge("JobsRunning", jobsRunning, rb);
+
+    assertCounter("MapsLaunched", mapsLaunched, rb);
+    assertCounter("MapsCompleted", mapsCompleted, rb);
+    assertCounter("MapsFailed", mapsFailed, rb);
+    assertCounter("MapsKilled", mapsKilled, rb);
+    assertGauge("MapsRunning", mapsRunning, rb);
+    assertGauge("MapsWaiting", mapsWaiting, rb);
+
+    assertCounter("ReducesLaunched", reducesLaunched, rb);
+    assertCounter("ReducesCompleted", reducesCompleted, rb);
+    assertCounter("ReducesFailed", reducesFailed, rb);
+    assertCounter("ReducesKilled", reducesKilled, rb);
+    assertGauge("ReducesRunning", reducesRunning, rb);
+    assertGauge("ReducesWaiting", reducesWaiting, rb);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,1085 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyFloat;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.junit.Test;
+
+
+public class TestRMContainerAllocator {
+
+  static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
+
+  static final Priority MAP_PRIORITY = RMContainerAllocatorForTest.getMapPriority();
+  static final Priority REDUCE_PRIORITY = RMContainerAllocatorForTest.getReducePriority();
+  static final Priority FAST_FAIL_MAP_PRIORITY = RMContainerAllocatorForTest.getFailedMapPriority();
+  
+  private static final int port = 3333;
+
+  /**
+   * Verifies simple allocation. Matches pending requets to assigned containers.
+   */
+  @Test
+  public void testSimple() {
+    LOG.info("Running testSimple");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
+        new String[] { "h3" });
+
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    assertEquals(3, rmComm.addRequests.size());
+
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2,
+        event3 }, eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+  
+    
+  @Test 
+  public void testMapNodeLocality() throws Exception {
+    // test checks that ordering of allocated containers list from the RM does 
+    // not affect the map->container assignment done by the AM. If there is a 
+    // node local container available for a map then it should be assigned to 
+    // that container and not a rack-local container that happened to be seen 
+    // earlier in the allocated containers list from the RM.
+    // Regression test for MAPREDUCE-4893
+    LOG.info("Running testMapNodeLocality");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+    
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
+        new String[] { "h2" });
+    
+    // Register the asks with the AMScheduler
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    assertEquals(3, rmComm.addRequests.size());
+
+
+    // update resources in scheduler
+    // Container from rack-local first. This makes node h3 the first in the
+   // list of allocated containers but it should not be assigned to task1.
+    // Containers from node-local next. This allocates 2 node local 
+    // containers for task1 and task2. These should be matched with those tasks.
+    Container container1 = newContainer(appContext, 1, "h3", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h1", 2048, MAP_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h1", 2048, MAP_PRIORITY);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2, event3 },
+        eventHandler.launchRequests, eventHandler.assignEvents, false,
+        appContext);
+    
+    // container1 on node 3 should be the last one assigned
+    // it should be assigned rack-local to task 3
+    AMContainerEventAssignTA assignEvent = eventHandler.assignEvents.get(
+                                          eventHandler.assignEvents.size() - 1);
+    Assert.assertTrue(assignEvent.getContainerId().equals(container1.getId()));
+    Assert.assertTrue(
+        "Unexpected node local assignment",
+        !Arrays.asList(event3.getHosts()).contains(
+            appContext.getAllContainers().get(assignEvent.getContainerId())
+                .getContainer().getNodeId().getHost()));
+  }
+  
+  /**
+   * Verifies allocation based on resource ask and allocation.
+   */
+  @Test
+  public void testResource() {
+    LOG.info("Running testResource");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 2048,
+        new String[] { "h2" });
+
+    // Register the asks with the AMScheduler
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+
+    assertEquals(2, rmComm.addRequests.size());
+
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 2048, MAP_PRIORITY);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event2 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+  
+  @Test
+  public void testMapReduceScheduling() {
+    LOG.info("Running testMapReduceScheduling");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    // Map, previously failed.
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 2048,
+        new String[] { "h1", "h2" }, true, false);
+
+    // Reduce, first attempt.
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 3000,
+        new String[] { "h1" }, false, true);
+
+    // Map, first attempt.
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 2048,
+        new String[] { "h3" }, false, false);
+
+    // Register the asks with the AMScheduler
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    // The reduce should not have been asked for yet - event2
+    assertEquals(2, rmComm.addRequests.size());
+
+   
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h1", 3072,
+        REDUCE_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 2048, MAP_PRIORITY);
+    Container container4 = newContainer(appContext, 4, "h1", 2048,
+        FAST_FAIL_MAP_PRIORITY);
+    // Container1 - release low mem.
+    // Container2 - release no pending reduce request.
+    // Container3 - assign to t3
+    // Container4 - assign to t1
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+    containerIds.add(container4.getId());
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Two stop container events for Container1 and Container2 ?
+    assertEquals(2, eventHandler.stopEvents.size());
+
+    // Since maps have been assigned containers. Verify that a request is sent
+    // out for the pending reduce task.
+    assertEquals(3, rmComm.addRequests.size());
+
+    // Verify map assignments.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event1, event3 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+
+    eventHandler.reset();
+
+    Container container5 = newContainer(appContext, 5, "h1", 3072,
+        REDUCE_PRIORITY); // assign to t2
+    containerIds.clear();
+    containerIds.add(container5.getId());
+    allocatedEvent = new AMSchedulerEventContainersAllocated(containerIds,
+        false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Verify reduce assignment.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event2 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+  }
+  
+  @Test
+  public void testReduceScheduling() throws Exception {
+    int totalMaps = 10;
+    int succeededMaps = 1;
+    int scheduledMaps = 10;
+    int scheduledReduces = 0;
+    int assignedMaps = 2;
+    int assignedReduces = 0;
+    int mapResourceReqt = 1024;
+    int reduceResourceReqt = 2*1024;
+    int numPendingReduces = 4;
+    float maxReduceRampupLimit = 0.5f;
+    float reduceSlowStart = 0.2f;
+    
+    RMContainerAllocator allocator = mock(RMContainerAllocator.class);
+    doCallRealMethod().when(allocator).
+        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
+            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
+    
+    // Test slow-start
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    
+    // verify slow-start still in effect when no more maps need to
+    // be scheduled but some have yet to complete
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps,
+        0, scheduledReduces,
+        totalMaps - succeededMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, never()).setIsReduceStarted(true);
+    verify(allocator, never()).scheduleAllReduces();
+
+    succeededMaps = 3;
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator, times(1)).setIsReduceStarted(true);
+    
+    // Test reduce ramp-up
+    doReturn(100 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampUpReduces(anyInt());
+    verify(allocator, never()).rampDownReduces(anyInt());
+
+    // Test reduce ramp-down
+    scheduledReduces = 3;
+    doReturn(10 * 1024).when(allocator).getMemLimit();
+    allocator.scheduleReduces(
+        totalMaps, succeededMaps, 
+        scheduledMaps, scheduledReduces, 
+        assignedMaps, assignedReduces, 
+        mapResourceReqt, reduceResourceReqt, 
+        numPendingReduces, 
+        maxReduceRampupLimit, reduceSlowStart);
+    verify(allocator).rampDownReduces(anyInt());
+  }
+
+  @Test
+  public void testBlackListedNodes() throws Exception {
+    LOG.info("Running testBlackListedNodes");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    // 3 requests on 3 hosts.
+    AMSchedulerTALaunchRequestEvent event1 = createTALaunchReq(jobId, 1, 1024,
+        new String[] { "h1", "h4" });
+    AMSchedulerTALaunchRequestEvent event2 = createTALaunchReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    AMSchedulerTALaunchRequestEvent event3 = createTALaunchReq(jobId, 3, 1024,
+        new String[] { "h3" });
+
+    scheduler.handleEvent(event1);
+    scheduler.handleEvent(event2);
+    scheduler.handleEvent(event3);
+
+    assertEquals(3, rmComm.addRequests.size());
+
+    // 3 containers on 3 hosts.
+    Container container1 = newContainer(appContext, 1, "h1", 1024, MAP_PRIORITY);
+    Container container2 = newContainer(appContext, 2, "h2", 1024, MAP_PRIORITY);
+    Container container3 = newContainer(appContext, 3, "h3", 1024, MAP_PRIORITY);
+
+    // Simulate two blacklisted nodes.
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(container1.getNodeId(),
+            AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(container2.getNodeId(),
+            AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container1.getId());
+    containerIds.add(container2.getId());
+    containerIds.add(container3.getId());
+
+    rmComm.reset();
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+
+    // Only contianer 3 should have been assigned.
+    checkAssignments(new AMSchedulerTALaunchRequestEvent[] { event3 },
+        eventHandler.launchRequests, eventHandler.assignEvents, true,
+        appContext);
+
+    // Verify container stop events for the remaining two containers.
+    assertEquals(2, eventHandler.stopEvents.size());
+    Set<ContainerId> tmpSet = new HashSet<ContainerId>();
+    tmpSet.add(container1.getId());
+    tmpSet.add(container2.getId());
+    for (AMContainerEvent ame : eventHandler.stopEvents) {
+      tmpSet.remove(ame.getContainerId());
+    }
+    assertEquals(0, tmpSet.size());
+
+    // Verify new request events were sent out to replace these containers.
+    assertEquals(2, rmComm.addRequests.size());
+    // One of the requests should refer to host4.
+    boolean hostSeen = false;
+    for (ContainerRequest c : rmComm.addRequests) {
+      if (c.hosts.length != 0) {
+        if (!hostSeen) {
+          assertEquals(1, c.hosts.length);
+          assertEquals("h4", c.hosts[0]);
+          hostSeen = true;
+        } else {
+          fail("Only one request should have a host");
+        }
+      }
+    }
+  }
+  
+  @Test
+  public void testIgnoreBlacklisting() throws Exception {
+    LOG.info("Running testIgnoreBlacklisting");
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
+
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RMContainerAllocatorForTest scheduler = new RMContainerAllocatorForTest(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+
+    int attemptId = 0;
+    int currentHostNum = 0;
+    NodeId[] nodeIds = new NodeId[10];
+
+    // Add a node.
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+
+    // known=1, blacklisted=0. IgnoreBlacklisting=false, Assign 1
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Blacklist node1.
+    blacklistNode(nodeIds[0], appContext);
+
+    // known=1, blacklisted=1. IgnoreBlacklisting=true, Assign 1
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
+    // on non blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. (Request
+    // on blacklisted)
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
+    // (Request on non blacklisted)
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway.
+    // (Request on blacklisted)
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(0, eventHandler.assignEvents.size());
+
+    // Blacklist node2.
+    blacklistNode(nodeIds[1], appContext);
+
+    // Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
+    // on blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[0], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Blacklist node3. (While ignore is enabled)
+    blacklistNode(nodeIds[2], appContext);
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+
+    currentHostNum++;
+    nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    // Known=4, blacklisted=2, ignore should be true - assign 1 anyway. (Request
+    // on non-blacklisted)
+    assertTrue(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[currentHostNum], ++attemptId,
+        scheduler, eventHandler, appContext);
+    assertEquals(1, eventHandler.assignEvents.size());
+
+    // Add 5 more nodes.
+    for (int i = 0; i < 5; i++) {
+      currentHostNum++;
+      nodeIds[currentHostNum] = addNode(currentHostNum, appContext);
+    }
+
+    // Known=9, blacklisted=3, ignore should be false - assign 1 on host3.
+    assertFalse(appContext.getAllNodes().isBlacklistingIgnored());
+    assignContainerOnHost(jobId, nodeIds[2], ++attemptId, scheduler,
+        eventHandler, appContext);
+    assertEquals(0, eventHandler.assignEvents.size());
+
+  }
+
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+    YarnConfiguration conf = new YarnConfiguration();
+
+    TrackingEventHandler eventHandler = new TrackingEventHandler();
+    AppContext appContext = setupDefaultTestContext(eventHandler, conf);
+
+    TrackingAMContainerRequestor rmComm = new TrackingAMContainerRequestor(
+        appContext);
+    rmComm.init(conf);
+    rmComm.start();
+    RecalculateContainerAllocator scheduler = new RecalculateContainerAllocator(
+        rmComm, appContext);
+    scheduler.init(conf);
+    scheduler.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+    Job job = appContext.getJob(jobId);
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(2).when(job).getCompletedMaps();
+    doReturn(0).when(job).getCompletedReduces();
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+
+    // Ignore the first allocate.
+    scheduler.handleEvent(allocatedEvent);
+
+    // Since nothing changed, recalculate reduce = false.
+    scheduler.recalculatedReduceSchedule = false;
+    scheduler.handleEvent(allocatedEvent);
+    assertFalse("Unexpected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+
+    // Change completedMaps. Recalcualte reduce should be true.
+    doReturn(1).when(job).getCompletedMaps();
+    scheduler.handleEvent(allocatedEvent);
+    assertTrue("Expected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+
+    // Since nothing changed, recalculate reduce = false.
+    scheduler.recalculatedReduceSchedule = false;
+    scheduler.handleEvent(allocatedEvent);
+    assertFalse("Unexpected recalculate of reduce schedule",
+        scheduler.recalculatedReduceSchedule);
+  }
+  
+  // TODO XXX Unit test for AMNode to simulate node health status change.
+  
+  
+  
+  
+  
+  
+  private void blacklistNode(NodeId nodeId, AppContext appContext) {
+    appContext.getAllNodes().handle(
+        new AMNodeEvent(nodeId, AMNodeEventType.N_NODE_WAS_BLACKLISTED));
+  }
+
+  private NodeId addNode(int currentHostNum, AppContext appContext) {
+    NodeId nodeId = BuilderUtils.newNodeId("h" + currentHostNum, port);
+    appContext.getAllNodes().nodeSeen(nodeId);
+    appContext.getAllNodes().handle(
+        new AMNodeEventNodeCountUpdated(appContext.getAllNodes().size()));
+    return nodeId;
+  }
+
+  /**
+   * Generates the events for a TaskAttempt start request and an associated 
+   * container assignment. Resets TrackingEventHandler statistics.
+   */
+  void assignContainerOnHost(JobId jobId, NodeId nodeId, int attemptId,
+      RMContainerAllocatorForTest scheduler, TrackingEventHandler eventHandler,
+      AppContext appContext) {
+    eventHandler.reset();
+    AMSchedulerTALaunchRequestEvent launchRequest = createTALaunchReq(jobId,
+        attemptId, 1024, new String[] { nodeId.getHost() });
+    scheduler.handleEvent(launchRequest);
+    Container container = newContainer(appContext, attemptId, nodeId.getHost(),
+        1024, MAP_PRIORITY);
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(container.getId());
+    AMSchedulerEventContainersAllocated allocatedEvent = new AMSchedulerEventContainersAllocated(
+        containerIds, false);
+    scheduler.handleEvent(allocatedEvent);
+  }
+
+  /**
+   * Verify all assignments and launch requests against the requests.
+   */
+  private void checkAssignments(AMSchedulerTALaunchRequestEvent[] requests,
+      List<AMContainerEventLaunchRequest> launchRequests,
+      List<AMContainerEventAssignTA> assignEvents, boolean checkHostMatch,
+      AppContext appContext) {
+
+    assertNotNull("Containers not assigned", launchRequests);
+    assertNotNull("Containers not assigned", assignEvents);
+    assertEquals("LaunchRequest Count not correct", requests.length,
+        launchRequests.size());
+    assertEquals("Assigned Count not correct", requests.length,
+        assignEvents.size());
+
+    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+    // Check for uniqueness of container id launch requests
+    for (AMContainerEventLaunchRequest launchRequest : launchRequests) {
+      containerIds.add(launchRequest.getContainerId());
+    }
+    assertEquals("Multiple launch requests for same container id",
+        assignEvents.size(), containerIds.size());
+
+    // Check for uniqueness of container Id assignments.
+    containerIds.clear();
+    for (AMContainerEventAssignTA assignEvent : assignEvents) {
+      containerIds.add(assignEvent.getContainerId());
+    }
+    assertEquals("Assigned container Ids not unique", assignEvents.size(),
+        containerIds.size());
+
+    AMContainerEventAssignTA assignment = null;
+    // Check that all requests were assigned a container.
+    for (AMSchedulerTALaunchRequestEvent request : requests) {
+      for (AMContainerEventAssignTA assignEvent : assignEvents) {
+        if (request.getAttemptID().equals(assignEvent.getTaskAttemptId())) {
+          assignment = assignEvent;
+          break;
+        }
+      }
+      checkAssignment(request, assignment, checkHostMatch, appContext);
+      assignment = null;
+    }
+  }
+
+  
+  /**
+   * Verify assignment for a single request / allocation, optionally checking
+   * for the requested host.
+   */
+  private void checkAssignment(AMSchedulerTALaunchRequestEvent request,
+      AMContainerEventAssignTA assignEvent, boolean checkHostMatch,
+      AppContext appContext) {
+
+    Assert.assertNotNull(
+        "Nothing assigned to attempt " + request.getAttemptID(), assignEvent);
+    if (checkHostMatch) {
+      if (request.getHosts().length == 0) {
+        return;
+      } else {
+        Assert.assertTrue(
+            "Not assigned to requested host",
+            Arrays.asList(request.getHosts()).contains(
+                appContext.getAllContainers().get(assignEvent.getContainerId())
+                    .getContainer().getNodeId().getHost()));
+      }
+    }
+  }
+
+  
+  /**
+   * Create containers for allocation. Will also register the associated node 
+   * with the AMNodeMap, and the container with the AMContainerMap.
+   */
+  private Container newContainer(AppContext appContext,
+      int containerNum, String host, int memory, Priority priority) {
+    ContainerId containerId = BuilderUtils.newContainerId(appContext.getApplicationAttemptId(),
+        containerNum);
+    NodeId nodeId = BuilderUtils.newNodeId(host, port);
+    
+    appContext.getAllNodes().nodeSeen(nodeId);
+    Resource resource = BuilderUtils.newResource(memory, 1);
+    Container container = BuilderUtils.newContainer(containerId, nodeId, host
+        + ":8000", resource, priority, null);
+    appContext.getAllContainers().addContainerIfNew(container);
+    return container;
+  }
+  
+  
+  private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
+      int taskAttemptId, int memory, String[] hosts) {
+    return createTALaunchReq(jobId, taskAttemptId, memory, hosts, false, false);
+  }
+  
+  private AMSchedulerTALaunchRequestEvent createTALaunchReq(JobId jobId,
+      int taskAttemptId, int memory, String[] hosts,
+      boolean earlierFailedAttempt, boolean reduce) {
+    Resource resource = BuilderUtils.newResource(memory, 1);
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    String[] hostArray;
+    String[] rackArray;
+    if (earlierFailedAttempt) {
+      hostArray = new String[0];
+      rackArray = new String[0];
+    } else {
+      hostArray = hosts;
+      rackArray = new String[] { NetworkTopology.DEFAULT_RACK };
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+        taskAttemptId);
+    AMSchedulerTALaunchRequestEvent event = new AMSchedulerTALaunchRequestEvent(
+        attemptId, earlierFailedAttempt, resource, null, null, null, null,
+        hostArray, rackArray);
+    return event;
+  }
+  
+  static class RMContainerAllocatorForTest extends RMContainerAllocator {
+
+    public RMContainerAllocatorForTest(ContainerRequestor requestor,
+        AppContext appContext) {
+      super(requestor, appContext);
+    }
+
+    @Override
+    public void start() {
+      this.job = appContext.getJob(jobId);
+    }
+
+    @Override
+    protected void handleEvent(AMSchedulerEvent event) {
+      super.handleEvent(event);
+    }
+
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        MRTaskContext remoteTaskContext) {
+      return false;
+    }
+
+    static Priority getMapPriority() {
+      return BuilderUtils.newPriority(PRIORITY_MAP.getPriority());
+    }
+
+    static Priority getReducePriority() {
+      return BuilderUtils.newPriority(PRIORITY_REDUCE.getPriority());
+    }
+
+    static Priority getFailedMapPriority() {
+      return BuilderUtils.newPriority(PRIORITY_FAST_FAIL_MAP.getPriority());
+    }
+  }
+
+  static class RecalculateContainerAllocator extends
+      RMContainerAllocatorForTest {
+
+    boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(ContainerRequestor requestor,
+        AppContext appContext) {
+      super(requestor, appContext);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+    
+    @Override
+    protected boolean shouldProfileTaskAttempt(JobConf conf,
+        MRTaskContext remoteTaskContext) {
+      return false;
+    }
+  }
+
+  class TrackingAMContainerRequestor extends RMContainerRequestor {
+
+    List<ContainerRequest> addRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
+    List<ContainerRequest> decRequests = new LinkedList<RMContainerRequestor.ContainerRequest>();
+    private Resource minContainerMemory = Records.newRecord(Resource.class);
+    private Resource maxContainerMemory = Records.newRecord(Resource.class);
+    
+    void reset() {
+      addRequests.clear();
+      decRequests.clear();
+    }
+
+    public TrackingAMContainerRequestor(AppContext context) {
+      this(context, 1024, 10240);
+    }
+
+    public TrackingAMContainerRequestor(AppContext context,
+        int minContainerMemory, int maxContainerMemory) {
+      super(null, context);
+      this.minContainerMemory.setMemory(minContainerMemory);
+      this.maxContainerMemory.setMemory(maxContainerMemory);
+    }
+   
+    @Override
+    public void addContainerReq(ContainerRequest request) {
+      addRequests.add(request);
+    }
+    
+    @Override
+    public void decContainerReq(ContainerRequest request) {
+      decRequests.add(request);
+    }
+    
+    @Override
+    public Map<ApplicationAccessType, String> getApplicationAcls() {
+      return null;
+    }
+    
+    @Override
+    public Resource getAvailableResources() {
+      return BuilderUtils.newResource(0, 1);
+    }
+    
+    @Override
+    protected Resource getMaxContainerCapability() {
+      return maxContainerMemory;
+    }
+    
+    @Override
+    protected Resource getMinContainerCapability() {
+      return minContainerMemory;
+    }
+
+    @Override
+    public void register() {
+    }
+
+    @Override
+    public void unregister() {
+    }
+
+    @Override
+    public void startAllocatorThread() {
+    }
+    
+    @Override
+    public AMRMProtocol createSchedulerProxy() {
+      return null;
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  private class TrackingEventHandler implements EventHandler {
+
+    List<AMContainerEventLaunchRequest> launchRequests = new LinkedList<AMContainerEventLaunchRequest>();
+    List<AMContainerEventAssignTA> assignEvents = new LinkedList<AMContainerEventAssignTA>();
+    List<AMContainerEvent> stopEvents = new LinkedList<AMContainerEvent>();
+    List<RMCommunicatorContainerDeAllocateRequestEvent> releaseEvents = new LinkedList<RMCommunicatorContainerDeAllocateRequestEvent>();
+    
+    @Override
+    public void handle(Event event) {
+      if (event.getType() == AMContainerEventType.C_LAUNCH_REQUEST) {
+        launchRequests.add((AMContainerEventLaunchRequest)event);
+      } else if (event.getType() == AMContainerEventType.C_ASSIGN_TA) {
+        assignEvents.add((AMContainerEventAssignTA)event);
+      } else if (event.getType() == AMContainerEventType.C_STOP_REQUEST) {
+        stopEvents.add((AMContainerEvent)event);
+      } else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
+        releaseEvents.add((RMCommunicatorContainerDeAllocateRequestEvent)event);
+      }
+    }
+    
+    public void reset() {
+      this.launchRequests.clear();
+      this.assignEvents.clear();
+      this.stopEvents.clear();
+    }
+  }
+
+  // TODO Allow specifying the jobId as a parameter
+  @SuppressWarnings("rawtypes") 
+  private AppContext setupDefaultTestContext(EventHandler eventHandler,
+      Configuration conf) {
+    AppContext appContext = mock(AppContext.class);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    JobID id = TypeConverter.fromYarn(appId);
+    JobId jobId = TypeConverter.toYarn(id);
+
+    Job mockJob = mock(Job.class);
+    when(mockJob.getID()).thenReturn(jobId);
+    when(mockJob.getProgress()).thenReturn(0.0f);
+    when(mockJob.getConf()).thenReturn(conf);
+
+    Clock clock = new ControlledClock(new SystemClock());
+
+    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    amNodeMap.init(conf);
+    amNodeMap.start();
+    
+
+    AMContainerMap amContainerMap = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appContext);
+    amContainerMap.init(conf);
+    amContainerMap.start();
+    when(appContext.getAllContainers()).thenReturn(amContainerMap);
+
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(appContext.getEventHandler()).thenReturn(eventHandler);
+    when(appContext.getJob(jobId)).thenReturn(mockJob);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getAllNodes()).thenReturn(amNodeMap);
+    when(appContext.getClusterInfo()).thenReturn(
+        new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+            .newResource(10240, 1)));
+
+    return appContext;
+  }
+  
+  // TODO Add a unit test to verify a correct launchContainer invocation with
+  // security.
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,366 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Test;
+
+public class TestRMContainerRequestor {
+  
+  @Test
+  public void testFailedAllocate() throws Exception{
+    AppContext appContext = setupDefaultTestContext();
+    AMRMProtocolForFailedAllocate amrm = createAMRMProtocolForFailedAllocate();
+    RMContainerRequestorForTest rmComm = new RMContainerRequestorForTest(appContext, amrm);
+    amrm.setRmCommunicator(rmComm);
+    rmComm.init(new YarnConfiguration());
+    rmComm.start();
+    
+    Resource resource = BuilderUtils.newResource(512, 1);
+    String [] hosts = new String[]{"host1", "host2"};
+    String [] racks = new String[]{"rack1"};
+    Priority priority = BuilderUtils.newPriority(5);
+    ContainerRequest cr1 = new ContainerRequest(resource, hosts, racks, priority);
+    ContainerRequest cr2 = new ContainerRequest(resource, new String[]{"host1"}, racks, priority);
+    
+    rmComm.addContainerReq(cr1);
+    rmComm.addContainerReq(cr2);
+    
+
+    // Set containerRequest to be decremented.
+    amrm.setIncContainerRequest(cr1);
+    amrm.setDecContainerRequest(cr2);
+    
+    // Verify initial ask.
+    Set<ResourceRequest> askSet = null;
+    askSet = rmComm.getAskSet();
+    assertEquals(4, askSet.size()); //2 hosts. 1 rack. *
+    verifyAsks(askSet, 2, 1, 2, 2);
+    
+    //First heartbeat
+    rmComm.heartbeat();
+    //Verify empty ask.
+    askSet = rmComm.getAskSet();
+    assertEquals(0, askSet.size()); //2 hosts. 1 rack. *
+    
+    // Add 2 more container requests.
+    rmComm.addContainerReq(cr1);
+    rmComm.addContainerReq(cr2);
+    
+    //Verify ask
+    askSet = rmComm.getAskSet();
+    assertEquals(4, askSet.size());
+    verifyAsks(askSet, 4, 2, 4, 4);
+    
+    try {
+      rmComm.heartbeat();
+      Assert.fail("Second heartbeat was expected to fail");
+    } catch (YarnRemoteException yre) {
+    }
+    
+    // Verify ask. Should factor in +cr1 = 5 3 5 5, -cr2 = 4 3 4 4
+    assertEquals(4, askSet.size());
+    verifyAsks(askSet, 4, 3, 4, 4);
+  }
+  
+  /**
+   * Verify job progress is being reported to the RM.
+   */
+  @Test
+  public void testProgressReportedToRM() throws Exception {
+    AppContext appContext = setupDefaultTestContext();
+    TrackingAMRMProtocol amrm = new TrackingAMRMProtocol();
+    RMContainerRequestorForTest rmComm = new RMContainerRequestorForTest(appContext, amrm);
+    rmComm.init(new YarnConfiguration());
+    rmComm.start();
+
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appContext
+        .getApplicationID()));
+    Job job = appContext.getJob(jobId);
+    
+    rmComm.heartbeat();
+    assertEquals(0.0f, amrm.allocateRequest.getProgress(), 0.001);
+    
+    doReturn(0.11f).when(job).getProgress();
+    rmComm.heartbeat();
+    assertEquals(0.11f, amrm.allocateRequest.getProgress(), 0.001);
+    
+    doReturn(0.95f).when(job).getProgress();
+    rmComm.heartbeat();
+    assertEquals(0.95f, amrm.allocateRequest.getProgress(), 0.001);
+  }
+  
+  
+  
+  private void verifyAsks(Set<ResourceRequest> askSet, int host1, int host2, int rack1, int generic) {
+    for (ResourceRequest rr : askSet) {
+      if (rr.getHostName().equals("*")) {
+        assertEquals(generic, rr.getNumContainers());
+      } else if (rr.getHostName().equals("host1")) {
+        assertEquals(host1, rr.getNumContainers());
+      } else if (rr.getHostName().equals("host2")) {
+        assertEquals(host2, rr.getNumContainers());
+      } else if (rr.getHostName().equals("rack1")) {
+        assertEquals(rack1, rr.getNumContainers());
+      }
+    }
+  }
+  
+  private AMRMProtocolForFailedAllocate createAMRMProtocolForFailedAllocate() {
+    AMResponse amResponse = 
+        newAMResponse(new ArrayList<Container>(),
+            BuilderUtils.newResource(1024, 1), new ArrayList<ContainerStatus>(),
+            false, 1, new ArrayList<NodeReport>());
+    AllocateResponse allocateResponse = newAllocateResponse(
+        amResponse, 2);
+    return new AMRMProtocolForFailedAllocate(allocateResponse);
+  }
+
+  class AMRMProtocolForFailedAllocate implements AMRMProtocol {
+    private AllocateResponse allocateResponse;
+    private RMContainerRequestor rmComm;
+    private ContainerRequest crInc;
+    private ContainerRequest crDec;
+
+    AMRMProtocolForFailedAllocate(AllocateResponse response) {
+      allocateResponse = response;
+    }
+    
+    void setRmCommunicator(RMContainerRequestor rmComm) {
+      this.rmComm = rmComm;
+    }
+    
+    void setIncContainerRequest(ContainerRequest cr) {
+      this.crInc = cr;
+    }
+    
+    void setDecContainerRequest(ContainerRequest cr) {
+      this.crDec = cr;
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnRemoteException {
+      if (request.getResponseId() == 0) {
+        return allocateResponse;
+      } else if (request.getResponseId() == 1) {
+        // Change the table before throwing the exception.
+        rmComm.addContainerReq(crInc);
+        rmComm.decContainerReq(crDec);
+        throw RPCUtil.getRemoteException("MockRpcError");
+      }
+      return null;
+    }
+  }
+
+  class RMContainerRequestorForTest extends RMContainerRequestor {
+
+    private AMRMProtocol amRmProtocol;
+
+    public RMContainerRequestorForTest(AppContext context, AMRMProtocol amrm) {
+      super(null, context);
+      this.amRmProtocol = amrm;
+    }
+    
+    @Override
+    public AMRMProtocol createSchedulerProxy() {
+      if (amRmProtocol == null) {
+        amRmProtocol = mock(AMRMProtocol.class);
+        AMResponse amResponse = newAMResponse(
+            new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
+            new ArrayList<ContainerStatus>(), false, 1,
+            new ArrayList<NodeReport>());
+        AllocateResponse allocateResponse = newAllocateResponse(
+            amResponse, 2);
+        try {
+          when(amRmProtocol.allocate(any(AllocateRequest.class))).thenReturn(allocateResponse);
+        } catch (YarnRemoteException e) {
+        }
+      }
+      return amRmProtocol;
+    }
+    
+    @Override public void register() {}
+    @Override public void unregister() {}
+      
+    @Override public void startAllocatorThread() {}
+  }
+
+  private static class TrackingAMRMProtocol implements AMRMProtocol {
+
+    RegisterApplicationMasterRequest registerRequest;
+    FinishApplicationMasterRequest finishApplicationMasterRequest;
+    AllocateRequest allocateRequest;
+
+    public void reset() {
+      this.registerRequest = null;
+      this.finishApplicationMasterRequest = null;
+      this.allocateRequest = null;
+    }
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      this.registerRequest = request;
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnRemoteException {
+      this.finishApplicationMasterRequest = request;
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnRemoteException {
+      this.allocateRequest = request;
+      AMResponse amResponse = newAMResponse(
+          new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
+          new ArrayList<ContainerStatus>(), false, 1,
+          new ArrayList<NodeReport>());
+      AllocateResponse allocateResponse = newAllocateResponse(
+          amResponse, 2);
+      return allocateResponse;
+    }
+  }
+
+  private AppContext setupDefaultTestContext() {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    JobID id = TypeConverter.fromYarn(appId);
+    JobId jobId = TypeConverter.toYarn(id);
+
+    Job mockJob = mock(Job.class);
+    doReturn(0.0f).when(mockJob).getProgress();
+    doReturn(jobId).when(mockJob).getID();
+
+    @SuppressWarnings("rawtypes")
+    EventHandler handler = mock(EventHandler.class);
+
+    Clock clock = new ControlledClock(new SystemClock());
+    
+    AMNodeMap amNodeMap = mock(AMNodeMap.class);
+    when(amNodeMap.isHostBlackListed(any(String.class))).thenReturn(false);
+    
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(appContext.getEventHandler()).thenReturn(handler);
+    when(appContext.getJob(jobId)).thenReturn(mockJob);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getAllNodes()).thenReturn(amNodeMap);
+
+    return appContext;
+  }
+  
+  
+  // TODO XXX Move all of these into BuilderUtils
+  public static AllocateRequest newAllocateRequest(
+      ApplicationAttemptId applicationAttemptId, int responseID,
+      float appProgress, List<ResourceRequest> resourceAsk,
+      List<ContainerId> containersToBeReleased) {
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+    allocateRequest.setApplicationAttemptId(applicationAttemptId);
+    allocateRequest.setResponseId(responseID);
+    allocateRequest.setProgress(appProgress);
+    allocateRequest.addAllAsks(resourceAsk);
+    allocateRequest.addAllReleases(containersToBeReleased);
+    return allocateRequest;
+  }
+  
+  public static AllocateResponse newAllocateResponse(AMResponse amResponse,
+      int numNodes) {
+    AllocateResponse response = Records.newRecord(AllocateResponse.class);
+    response.setAMResponse(amResponse);
+    response.setNumClusterNodes(numNodes);
+    return response;
+  }
+  
+  public static AMResponse newAMResponse(List<Container> allocated,
+      Resource available, List<ContainerStatus> completed, boolean reboot,
+      int responseId, List<NodeReport> nodeUpdates) {
+    AMResponse amResponse = Records.newRecord(AMResponse.class);
+    amResponse.setAllocatedContainers(allocated);
+    amResponse.setAvailableResources(available);
+    amResponse.setCompletedContainersStatuses(completed);
+    amResponse.setReboot(reboot);
+    amResponse.setResponseId(responseId);
+    amResponse.setUpdatedNodes(nodeUpdates);
+    return amResponse;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/TestAMContainerHelpers.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestAMContainerHelpers {
+
+  // WARNING: This test must be the only test in this file. This is because
+  // there is an optimization where the credentials passed in are cached
+  // statically so they do not need to be recomputed when creating a new
+  // ContainerLaunchContext. if other tests run first this code will cache
+  // their credentials and this test will fail trying to look for the
+  // credentials it inserted in.
+
+  @Test
+  public void testCLCConstruction() throws Exception {
+    final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
+    final byte[] SECRET_KEY = ("secretkey").getBytes();
+    Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(
+        1);
+    acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+
+    // setup UGI for security so tokens and keys are preserved
+    jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(jobConf);
+
+    Credentials credentials = new Credentials();
+    credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
+    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+        ("tokenid").getBytes(), ("tokenpw").getBytes(), new Text("tokenkind"),
+        new Text("tokenservice"));
+
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, BuilderUtils
+        .newApplicationAttemptId(appId, 1).toString());
+    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+
+    ContainerLaunchContext launchCtx = AMContainerHelpers
+        .createContainerLaunchContext(acls, containerId, jobConf, TaskType.MAP,
+            jobToken, TypeConverter.fromYarn(jobId), mock(Resource.class),
+            containerId, taListener, credentials, false);
+
+    Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
+    Credentials launchCredentials = new Credentials();
+
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(launchCtx.getContainerTokens());
+    launchCredentials.readTokenStorageStream(dibb);
+
+    // verify all tokens specified for the task attempt are in the launch
+    // context
+    for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
+      Token<? extends TokenIdentifier> launchToken = launchCredentials
+          .getToken(token.getService());
+      Assert.assertNotNull("Token " + token.getService() + " is missing",
+          launchToken);
+      Assert.assertEquals("Token " + token.getService() + " mismatch", token,
+          launchToken);
+    }
+
+    // verify the secret key is in the launch context
+    Assert.assertNotNull("Secret key missing",
+        launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
+    Assert.assertTrue(
+        "Secret key mismatch",
+        Arrays.equals(SECRET_KEY,
+            launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
+  }
+
+  static public class StubbedFS extends RawLocalFileSystem {
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return new FileStatus(1, false, 1, 1, 1, f);
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TestDataStatistics.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TestDataStatistics.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TestDataStatistics.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TestDataStatistics.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDataStatistics {
+
+  private static final double TOL = 0.001;
+
+  @Test
+  public void testEmptyDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics();
+    Assert.assertEquals(0, statistics.count(), TOL);
+    Assert.assertEquals(0, statistics.mean(), TOL);
+    Assert.assertEquals(0, statistics.var(), TOL);
+    Assert.assertEquals(0, statistics.std(), TOL);
+    Assert.assertEquals(0, statistics.outlier(1.0f), TOL);
+  }
+  
+  @Test
+  public void testSingleEntryDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics(17.29);
+    Assert.assertEquals(1, statistics.count(), TOL);
+    Assert.assertEquals(17.29, statistics.mean(), TOL);
+    Assert.assertEquals(0, statistics.var(), TOL);
+    Assert.assertEquals(0, statistics.std(), TOL);
+    Assert.assertEquals(17.29, statistics.outlier(1.0f), TOL);
+  }
+  
+  @Test
+  public void testMutiEntryDataStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics();
+    statistics.add(17);
+    statistics.add(29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(23.0, statistics.mean(), TOL);
+    Assert.assertEquals(36.0, statistics.var(), TOL);
+    Assert.assertEquals(6.0, statistics.std(), TOL);
+    Assert.assertEquals(29.0, statistics.outlier(1.0f), TOL);
+ }
+  
+  @Test
+  public void testUpdateStatistics() throws Exception {
+    DataStatistics statistics = new DataStatistics(17);
+    statistics.add(29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(23.0, statistics.mean(), TOL);
+    Assert.assertEquals(36.0, statistics.var(), TOL);
+
+    statistics.updateStatistics(17, 29);
+    Assert.assertEquals(2, statistics.count(), TOL);
+    Assert.assertEquals(29.0, statistics.mean(), TOL);
+    Assert.assertEquals(0.0, statistics.var(), TOL);
+  }
+}



Mime
View raw message