hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [19/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.junit.Test;
+
+public class TestMapReduceChildJVM {
+
+  private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
+
+  @Test
+  public void testCommandLine() throws Exception {
+
+    MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    // TODO XXX: Change classname after renaming back to YarnChild
+    Assert.assertEquals(
+      "[exec $JAVA_HOME/bin/java" +
+      " -Djava.net.preferIPv4Stack=true" +
+      " -Dhadoop.metrics.log.level=WARN" +
+      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      " -Dlog4j.configuration=container-log4j.properties" +
+      " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
+      " -Dyarn.app.mapreduce.container.log.filesize=0" +
+      " -Dhadoop.root.logger=INFO,CLA" +
+      " org.apache.hadoop.mapred.YarnChild2 127.0.0.1" +
+      " 54321" +
+      " attempt_0_0000_m_000000_0" +
+      " 0" +
+      " 1><LOG_DIR>/stdout" +
+      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+  }
+
+  private static final class MyMRApp extends MRApp {
+
+    private String myCommandLine;
+
+    public MyMRApp(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new MockContainerLauncher() {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+          if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+            NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+            ContainerLaunchContext launchContext = launchEvent.getContainerLaunchContext();
+            String cmdString = launchContext.getCommands().toString();
+            LOG.info("launchContext " + cmdString);
+            myCommandLine = cmdString;
+          }
+          super.handle(event);
+        }
+      };
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,650 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertFalse;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.spy;
+//import static org.mockito.Mockito.times;
+//import static org.mockito.Mockito.verify;
+//import static org.mockito.Mockito.when;
+//
+//import java.io.IOException;
+//import java.net.InetSocketAddress;
+//import java.util.Arrays;
+//import java.util.HashMap;
+//import java.util.Iterator;
+//import java.util.Map;
+//
+//import junit.framework.Assert;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+//import org.apache.hadoop.fs.FileStatus;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.fs.RawLocalFileSystem;
+//import org.apache.hadoop.io.DataInputByteBuffer;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapred.JobConf;
+//import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+//import org.apache.hadoop.mapred.WrappedJvmID;
+//import org.apache.hadoop.mapreduce.JobCounter;
+//import org.apache.hadoop.mapreduce.MRJobConfig;
+//import org.apache.hadoop.mapreduce.OutputCommitter;
+//import org.apache.hadoop.mapreduce.TypeConverter;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+//import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+//import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+//import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+//import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+//import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+//import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+//import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestEvent;
+//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+//import org.apache.hadoop.security.Credentials;
+//import org.apache.hadoop.security.UserGroupInformation;
+//import org.apache.hadoop.security.token.Token;
+//import org.apache.hadoop.security.token.TokenIdentifier;
+//import org.apache.hadoop.yarn.Clock;
+//import org.apache.hadoop.yarn.ClusterInfo;
+//import org.apache.hadoop.yarn.SystemClock;
+//import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+//import org.apache.hadoop.yarn.api.records.ApplicationId;
+//import org.apache.hadoop.yarn.api.records.Container;
+//import org.apache.hadoop.yarn.api.records.ContainerId;
+//import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+//import org.apache.hadoop.yarn.api.records.NodeId;
+//import org.apache.hadoop.yarn.api.records.Resource;
+//import org.apache.hadoop.yarn.event.Event;
+//import org.apache.hadoop.yarn.event.EventHandler;
+//import org.apache.hadoop.yarn.util.BuilderUtils;
+//import org.junit.Test;
+//import org.mockito.ArgumentCaptor;
+//
+//@SuppressWarnings({"unchecked", "rawtypes"})
+//public class TestTaskAttempt{
+//  @Test
+//  public void testAttemptContainerRequest() throws Exception {
+//    //WARNING: This test must run first.  This is because there is an 
+//    // optimization where the credentials passed in are cached statically so 
+//    // they do not need to be recomputed when creating a new 
+//    // ContainerLaunchContext. if other tests run first this code will cache
+//    // their credentials and this test will fail trying to look for the
+//    // credentials it inserted in.
+//    final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
+//    final byte[] SECRET_KEY = ("secretkey").getBytes();
+//    Map<ApplicationAccessType, String> acls =
+//        new HashMap<ApplicationAccessType, String>(1);
+//    acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    Path jobFile = mock(Path.class);
+//
+//    EventHandler eventHandler = mock(EventHandler.class);
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+//
+//    JobConf jobConf = new JobConf();
+//    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+//    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+//    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+//
+//    // setup UGI for security so tokens and keys are preserved
+//    jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+//    UserGroupInformation.setConfiguration(jobConf);
+//
+//    Credentials credentials = new Credentials();
+//    credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
+//    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+//        ("tokenid").getBytes(), ("tokenpw").getBytes(),
+//        new Text("tokenkind"), new Text("tokenservice"));
+//    
+//    TaskAttemptImpl taImpl =
+//        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//            mock(TaskSplitMetaInfo.class), jobConf, taListener,
+//            mock(OutputCommitter.class), jobToken, credentials,
+//            new SystemClock(), null);
+//
+//    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
+//    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+//    
+//    ContainerLaunchContext launchCtx =
+//        TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
+//            jobConf, jobToken, taImpl.createRemoteTask(),
+//            TypeConverter.fromYarn(jobId), mock(Resource.class),
+//            mock(WrappedJvmID.class), taListener,
+//            credentials);
+//
+//    Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
+//    Credentials launchCredentials = new Credentials();
+//
+//    DataInputByteBuffer dibb = new DataInputByteBuffer();
+//    dibb.reset(launchCtx.getContainerTokens());
+//    launchCredentials.readTokenStorageStream(dibb);
+//
+//    // verify all tokens specified for the task attempt are in the launch context
+//    for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
+//      Token<? extends TokenIdentifier> launchToken =
+//          launchCredentials.getToken(token.getService());
+//      Assert.assertNotNull("Token " + token.getService() + " is missing",
+//          launchToken);
+//      Assert.assertEquals("Token " + token.getService() + " mismatch",
+//          token, launchToken);
+//    }
+//
+//    // verify the secret key is in the launch context
+//    Assert.assertNotNull("Secret key missing",
+//        launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
+//    Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
+//        launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
+//  }
+//
+//  static public class StubbedFS extends RawLocalFileSystem {
+//    @Override
+//    public FileStatus getFileStatus(Path f) throws IOException {
+//      return new FileStatus(1, false, 1, 1, 1, f);
+//    }
+//  }
+//
+//  @Test
+//  public void testMRAppHistoryForMap() throws Exception {
+//    MRApp app = new FailingAttemptsMRApp(1, 0);
+//    testMRAppHistory(app);
+//  }
+//
+//  @Test
+//  public void testMRAppHistoryForReduce() throws Exception {
+//    MRApp app = new FailingAttemptsMRApp(0, 1);
+//    testMRAppHistory(app);
+//  }
+//
+//  @Test
+//  public void testSingleRackRequest() throws Exception {
+//    TaskAttemptImpl.RequestContainerTransition rct =
+//        new TaskAttemptImpl.RequestContainerTransition(false);
+//
+//    EventHandler eventHandler = mock(EventHandler.class);
+//    String[] hosts = new String[3];
+//    hosts[0] = "host1";
+//    hosts[1] = "host2";
+//    hosts[2] = "host3";
+//    TaskSplitMetaInfo splitInfo =
+//        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+//
+//    TaskAttemptImpl mockTaskAttempt =
+//        createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+//    TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+//
+//    rct.transition(mockTaskAttempt, mockTAEvent);
+//
+//    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+//    verify(eventHandler, times(2)).handle(arg.capture());
+//    if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+//      Assert.fail("Second Event not of type ContainerRequestEvent");
+//    }
+//    ContainerRequestEvent cre =
+//        (ContainerRequestEvent) arg.getAllValues().get(1);
+//    String[] requestedRacks = cre.getRacks();
+//    //Only a single occurrence of /DefaultRack
+//    assertEquals(1, requestedRacks.length);
+//  }
+// 
+//  @Test
+//  public void testHostResolveAttempt() throws Exception {
+//    TaskAttemptImpl.RequestContainerTransition rct =
+//        new TaskAttemptImpl.RequestContainerTransition(false);
+//
+//    EventHandler eventHandler = mock(EventHandler.class);
+//    String[] hosts = new String[3];
+//    hosts[0] = "192.168.1.1";
+//    hosts[1] = "host2";
+//    hosts[2] = "host3";
+//    TaskSplitMetaInfo splitInfo =
+//        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+//
+//    TaskAttemptImpl mockTaskAttempt =
+//        createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+//    TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+//    when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+//
+//    TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+//    rct.transition(spyTa, mockTAEvent);
+//    verify(spyTa).resolveHost(hosts[0]);
+//    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+//    verify(eventHandler, times(2)).handle(arg.capture());
+//    if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+//      Assert.fail("Second Event not of type ContainerRequestEvent");
+//    }
+//    Map<String, Boolean> expected = new HashMap<String, Boolean>();
+//    expected.put("host1", true);
+//    expected.put("host2", true);
+//    expected.put("host3", true);
+//    ContainerRequestEvent cre =
+//        (ContainerRequestEvent) arg.getAllValues().get(1);
+//    String[] requestedHosts = cre.getHosts();
+//    for (String h : requestedHosts) {
+//      expected.remove(h);
+//    }
+//    assertEquals(0, expected.size());
+//  }
+//  
+//  @Test
+//  public void testSlotMillisCounterUpdate() throws Exception {
+//    verifySlotMillis(2048, 2048, 1024);
+//    verifySlotMillis(2048, 1024, 1024);
+//    verifySlotMillis(10240, 1024, 2048);
+//  }
+//
+//  public void verifySlotMillis(int mapMemMb, int reduceMemMb,
+//      int minContainerSize) throws Exception {
+//    Clock actualClock = new SystemClock();
+//    ControlledClock clock = new ControlledClock(actualClock);
+//    clock.setTime(10);
+//    MRApp app =
+//        new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
+//    Configuration conf = new Configuration();
+//    conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
+//    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
+//    app.setClusterInfo(new ClusterInfo(BuilderUtils
+//        .newResource(minContainerSize), BuilderUtils.newResource(10240)));
+//
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    Map<TaskId, Task> tasks = job.getTasks();
+//    Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
+//    Iterator<Task> taskIter = tasks.values().iterator();
+//    Task mTask = taskIter.next();
+//    app.waitForState(mTask, TaskState.RUNNING);
+//    Task rTask = taskIter.next();
+//    app.waitForState(rTask, TaskState.RUNNING);
+//    Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
+//    Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
+//    Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
+//    Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
+//    TaskAttempt mta = mAttempts.values().iterator().next();
+//    TaskAttempt rta = rAttempts.values().iterator().next();
+//    app.waitForState(mta, TaskAttemptState.RUNNING);
+//    app.waitForState(rta, TaskAttemptState.RUNNING);
+//
+//    clock.setTime(11);
+//    app.getContext()
+//        .getEventHandler()
+//        .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
+//    app.getContext()
+//        .getEventHandler()
+//        .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
+//    app.waitForState(job, JobState.SUCCEEDED);
+//    Assert.assertEquals(mta.getFinishTime(), 11);
+//    Assert.assertEquals(mta.getLaunchTime(), 10);
+//    Assert.assertEquals(rta.getFinishTime(), 11);
+//    Assert.assertEquals(rta.getLaunchTime(), 10);
+//    Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
+//        job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
+//            .getValue());
+//    Assert.assertEquals(
+//        (int) Math.ceil((float) reduceMemMb / minContainerSize), job
+//            .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
+//            .getValue());
+//  }
+//  
+//  private TaskAttemptImpl createMapTaskAttemptImplForTest(
+//      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
+//    Clock clock = new SystemClock();
+//    return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
+//  }
+//  
+//  private TaskAttemptImpl createMapTaskAttemptImplForTest(
+//      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    Path jobFile = mock(Path.class);
+//    JobConf jobConf = new JobConf();
+//    OutputCommitter outputCommitter = mock(OutputCommitter.class);
+//    TaskAttemptImpl taImpl =
+//        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+//            null, clock, null);
+//    return taImpl;
+//  }
+//
+//  private void testMRAppHistory(MRApp app) throws Exception {
+//    Configuration conf = new Configuration();
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.FAILED);
+//    Map<TaskId, Task> tasks = job.getTasks();
+//
+//    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+//    Task task = tasks.values().iterator().next();
+//    Assert.assertEquals("Task state not correct", TaskState.FAILED, task
+//        .getReport().getTaskState());
+//    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
+//        .getAttempts();
+//    Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
+//
+//    Iterator<TaskAttempt> it = attempts.values().iterator();
+//    TaskAttemptReport report = it.next().getReport();
+//    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+//        report.getTaskAttemptState());
+//    Assert.assertEquals("Diagnostic Information is not Correct",
+//        "Test Diagnostic Event", report.getDiagnosticInfo());
+//    report = it.next().getReport();
+//    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+//        report.getTaskAttemptState());
+//  }
+//
+//  static class FailingAttemptsMRApp extends MRApp {
+//    FailingAttemptsMRApp(int maps, int reduces) {
+//      super(maps, reduces, true, "FailingAttemptsMRApp", true);
+//    }
+//
+//    @Override
+//    protected void attemptLaunched(TaskAttemptId attemptID) {
+//      getContext().getEventHandler().handle(
+//          new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+//              "Test Diagnostic Event"));
+//      getContext().getEventHandler().handle(
+//          new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+//    }
+//
+//    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+//        AppContext context) {
+//      return new EventHandler<JobHistoryEvent>() {
+//        @Override
+//        public void handle(JobHistoryEvent event) {
+//          if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
+//            TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
+//                .getHistoryEvent().getDatum();
+//            Assert.assertEquals("Diagnostic Information is not Correct",
+//                "Test Diagnostic Event", datum.get(8).toString());
+//          }
+//        }
+//      };
+//    }
+//  }
+//  
+//  @Test
+//  public void testLaunchFailedWhileKilling() throws Exception {
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+//    ApplicationAttemptId appAttemptId = 
+//      BuilderUtils.newApplicationAttemptId(appId, 0);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+//    Path jobFile = mock(Path.class);
+//    
+//    MockEventHandler eventHandler = new MockEventHandler();
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+//    
+//    JobConf jobConf = new JobConf();
+//    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+//    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+//    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+//    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+//    
+//    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+//    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+//    
+//    TaskAttemptImpl taImpl =
+//      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//          splits, jobConf, taListener,
+//          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+//          new SystemClock(), null);
+//
+//    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+//    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+//    Container container = mock(Container.class);
+//    when(container.getId()).thenReturn(contId);
+//    when(container.getNodeId()).thenReturn(nid);
+//    
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_SCHEDULE));
+//    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+//        container, mock(Map.class)));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_KILL));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+//    assertFalse(eventHandler.internalError);
+//  }
+//  
+//  @Test
+//  public void testContainerCleanedWhileRunning() throws Exception {
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+//    ApplicationAttemptId appAttemptId =
+//      BuilderUtils.newApplicationAttemptId(appId, 0);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+//    Path jobFile = mock(Path.class);
+//
+//    MockEventHandler eventHandler = new MockEventHandler();
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+//
+//    JobConf jobConf = new JobConf();
+//    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+//    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+//    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+//    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+//
+//    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+//    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+//
+//    AppContext appCtx = mock(AppContext.class);
+//    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+//    Resource resource = mock(Resource.class);
+//    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+//    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+//    when(resource.getMemory()).thenReturn(1024);
+//
+//    TaskAttemptImpl taImpl =
+//      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//          splits, jobConf, taListener,
+//          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+//          new SystemClock(), appCtx);
+//
+//    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+//    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+//    Container container = mock(Container.class);
+//    when(container.getId()).thenReturn(contId);
+//    when(container.getNodeId()).thenReturn(nid);
+//    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+//
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_SCHEDULE));
+//    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+//        container, mock(Map.class)));
+//    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+//    assertEquals("Task attempt is not in running state", taImpl.getState(),
+//        TaskAttemptState.RUNNING);
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+//    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+//        eventHandler.internalError);
+//  }
+//
+//  @Test
+//  public void testContainerCleanedWhileCommitting() throws Exception {
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+//    ApplicationAttemptId appAttemptId =
+//      BuilderUtils.newApplicationAttemptId(appId, 0);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+//    Path jobFile = mock(Path.class);
+//
+//    MockEventHandler eventHandler = new MockEventHandler();
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+//
+//    JobConf jobConf = new JobConf();
+//    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+//    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+//    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+//    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+//
+//    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+//    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+//
+//    AppContext appCtx = mock(AppContext.class);
+//    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+//    Resource resource = mock(Resource.class);
+//    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+//    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+//    when(resource.getMemory()).thenReturn(1024);
+//
+//    TaskAttemptImpl taImpl =
+//      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//          splits, jobConf, taListener,
+//          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+//          new SystemClock(), appCtx);
+//
+//    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+//    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+//    Container container = mock(Container.class);
+//    when(container.getId()).thenReturn(contId);
+//    when(container.getNodeId()).thenReturn(nid);
+//    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+//
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_SCHEDULE));
+//    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+//        container, mock(Map.class)));
+//    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_COMMIT_PENDING));
+//
+//    assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
+//        TaskAttemptState.COMMIT_PENDING);
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+//    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+//        eventHandler.internalError);
+//  }
+//  
+//  @Test
+//  public void testDoubleTooManyFetchFailure() throws Exception {
+//    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+//    ApplicationAttemptId appAttemptId =
+//      BuilderUtils.newApplicationAttemptId(appId, 0);
+//    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+//    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+//    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+//    Path jobFile = mock(Path.class);
+//
+//    MockEventHandler eventHandler = new MockEventHandler();
+//    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+//    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+//
+//    JobConf jobConf = new JobConf();
+//    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+//    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+//    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+//    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+//
+//    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+//    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+//
+//    AppContext appCtx = mock(AppContext.class);
+//    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+//    Resource resource = mock(Resource.class);
+//    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+//    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+//    when(resource.getMemory()).thenReturn(1024);
+//
+//    TaskAttemptImpl taImpl =
+//      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+//          splits, jobConf, taListener,
+//          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+//          new SystemClock(), appCtx);
+//
+//    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+//    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+//    Container container = mock(Container.class);
+//    when(container.getId()).thenReturn(contId);
+//    when(container.getNodeId()).thenReturn(nid);
+//    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+//
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_SCHEDULE));
+//    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+//        container, mock(Map.class)));
+//    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_DONE));
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+//    
+//    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+//        TaskAttemptState.SUCCEEDED);
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+//    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+//        TaskAttemptState.FAILED);
+//    taImpl.handle(new TaskAttemptEvent(attemptId,
+//        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+//    assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+//        TaskAttemptState.FAILED);
+//    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+//        eventHandler.internalError);
+//  }
+//
+//  public static class MockEventHandler implements EventHandler {
+//    public boolean internalError;
+//    
+//    @Override
+//    public void handle(Event event) {
+//      if (event instanceof JobEvent) {
+//        JobEvent je = ((JobEvent) event);
+//        if (JobEventType.INTERNAL_ERROR == je.getType()) {
+//          internalError = true;
+//        }
+//      }
+//    }
+//    
+//  };
+//}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("rawtypes")
+public class TestTaskImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);    
+  
+  private JobConf conf;
+  private TaskAttemptListener taskAttemptListener;
+  private TaskHeartbeatHandler taskHeartbeatHandler;
+  private OutputCommitter committer;
+  private Token<JobTokenIdentifier> jobToken;
+  private JobId jobId;
+  private Path remoteJobConfFile;
+  private Credentials credentials;
+  private Clock clock;
+  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
+  private MRAppMetrics metrics;
+  private TaskImpl mockTask;
+  private ApplicationId appId;
+  private TaskSplitMetaInfo taskSplitMetaInfo;  
+  private String[] dataLocations = new String[0]; 
+  private final TaskType taskType = TaskType.MAP;
+  private AppContext appContext;
+  
+  private int startCount = 0;
+  private int taskCounter = 0;
+  private final int partition = 1;
+  
+  private InlineDispatcher dispatcher;   
+  private List<MockTaskAttemptImpl> taskAttempts;
+  
+  private class MockTaskImpl extends TaskImpl {
+        
+    private int taskAttemptCounter = 0;
+
+    public MockTaskImpl(JobId jobId, int partition,
+        EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
+        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+        Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock,
+        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+        MRAppMetrics metrics, TaskHeartbeatHandler thh, AppContext appContext) {
+      super(jobId, taskType , partition, eventHandler,
+          remoteJobConfFile, conf, taskAttemptListener, committer, 
+          jobToken, credentials, clock,
+          completedTasksFromPreviousRun, startCount, metrics, thh, appContext);
+    }
+
+    @Override
+    public TaskType getType() {
+      return taskType;
+    }
+
+    @Override
+    protected TaskAttemptImpl createAttempt() {
+      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
+          eventHandler, taskAttemptListener, remoteJobConfFile, partition,
+          conf, committer, jobToken, credentials, clock, taskHeartbeatHandler, 
+          appContext);
+      taskAttempts.add(attempt);
+      return attempt;
+    }
+
+    @Override
+    protected int getMaxAttempts() {
+      return 100;
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
+  }
+  
+  private class MockTaskAttemptImpl extends TaskAttemptImpl {
+
+    private float progress = 0;
+    private TaskAttemptState state = TaskAttemptState.NEW;
+    private TaskAttemptId attemptId;
+
+    public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
+        JobConf conf, OutputCommitter committer,
+        Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock,
+        TaskHeartbeatHandler thh, AppContext appContext) {
+      super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
+          dataLocations, committer, jobToken, credentials, clock, thh,
+          appContext);
+
+      attemptId = Records.newRecord(TaskAttemptId.class);
+      attemptId.setId(id);
+      attemptId.setTaskId(taskId);
+    }
+
+    public TaskAttemptId getAttemptId() {
+      return attemptId;
+    }
+    
+    @Override
+    protected Task createRemoteTask() {
+      return new MockTask();
+    }    
+    
+    public float getProgress() {
+      return progress ;
+    }
+    
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+    
+    public void setState(TaskAttemptState state) {
+      this.state = state;
+    }
+    
+    public TaskAttemptState getState() {
+      return state;
+    }
+    
+  }
+  
+  private class MockTask extends Task {
+
+    @Override
+    public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+        throws IOException, ClassNotFoundException, InterruptedException {
+      return;
+    }
+
+    @Override
+    public boolean isMapTask() {
+      return true;
+    }    
+    
+  }
+  
+  @Before 
+  @SuppressWarnings("unchecked")
+  public void setup() {
+     dispatcher = new InlineDispatcher();
+    
+    ++startCount;
+    
+    conf = new JobConf();
+    taskAttemptListener = mock(TaskAttemptListener.class);
+    taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    committer = mock(OutputCommitter.class);
+    jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+    remoteJobConfFile = mock(Path.class);
+    credentials = null;
+    clock = new SystemClock();
+    metrics = mock(MRAppMetrics.class);  
+    dataLocations = new String[1];
+    
+    appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(System.currentTimeMillis());
+    appId.setId(1);
+
+    jobId = Records.newRecord(JobId.class);
+    jobId.setId(1);
+    jobId.setAppId(appId);
+    appContext = mock(AppContext.class);
+
+    taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+    when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
+    
+    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+    
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, taskHeartbeatHandler, appContext);        
+    
+  }
+
+  @After 
+  public void teardown() {
+    taskAttempts.clear();
+  }
+  
+  private TaskId getNewTaskID() {
+    TaskId taskId = Records.newRecord(TaskId.class);
+    taskId.setId(++taskCounter);
+    taskId.setJobId(jobId);
+    taskId.setTaskType(mockTask.getType());    
+    return taskId;
+  }
+  
+  private void scheduleTaskAttempt(TaskId taskId) {
+    mockTask.handle(new TaskEvent(taskId, 
+        TaskEventType.T_SCHEDULE));
+    assertTaskScheduledState();
+  }
+  
+  private void killTask(TaskId taskId) {
+    mockTask.handle(new TaskEvent(taskId, 
+        TaskEventType.T_KILL));
+    assertTaskKillWaitState();
+  }
+  
+  private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskScheduledState();
+  }
+
+  private void launchTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    assertTaskRunningState();    
+  }
+  
+  private void commitTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertTaskRunningState();    
+  }
+  
+  private MockTaskAttemptImpl getLastAttempt() {
+    return taskAttempts.get(taskAttempts.size()-1);
+  }
+  
+  private void updateLastAttemptProgress(float p) {    
+    getLastAttempt().setProgress(p);
+  }
+
+  private void updateLastAttemptState(TaskAttemptState s) {
+    getLastAttempt().setState(s);
+  }
+  
+  private void killRunningTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskRunningState();  
+  }
+  
+  private void failRunningTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertTaskRunningState();
+  }
+  
+  /**
+   * {@link TaskState#NEW}
+   */
+  private void assertTaskNewState() {
+    assertEquals(TaskState.NEW, mockTask.getState());
+  }
+  
+  /**
+   * {@link TaskState#SCHEDULED}
+   */
+  private void assertTaskScheduledState() {
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+  }
+
+  /**
+   * {@link TaskState#RUNNING}
+   */
+  private void assertTaskRunningState() {
+    assertEquals(TaskState.RUNNING, mockTask.getState());
+  }
+    
+  /**
+   * {@link TaskState#KILL_WAIT}
+   */
+  private void assertTaskKillWaitState() {
+    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+  }
+  
+  /**
+   * {@link TaskState#SUCCEEDED}
+   */
+  private void assertTaskSucceededState() {
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+  }
+  
+  @Test
+  public void testInit() {
+    LOG.info("--- START: testInit ---");
+    assertTaskNewState();
+    assert(taskAttempts.size() == 0);
+  }
+
+  @Test
+  /**
+   * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
+   */
+  public void testScheduleTask() {
+    LOG.info("--- START: testScheduleTask ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+  }
+  
+  @Test 
+  /**
+   * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
+   */
+  public void testKillScheduledTask() {
+    LOG.info("--- START: testKillScheduledTask ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killTask(taskId);
+  }
+  
+  @Test 
+  /**
+   * Kill attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
+   */
+  public void testKillScheduledTaskAttempt() {
+    LOG.info("--- START: testKillScheduledTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killScheduledTaskAttempt(getLastAttempt().getAttemptId());
+  }
+  
+  @Test 
+  /**
+   * Launch attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
+   */
+  public void testLaunchTaskAttempt() {
+    LOG.info("--- START: testLaunchTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+  }
+
+  @Test
+  /**
+   * Kill running attempt
+   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 
+   */
+  public void testKillRunningTaskAttempt() {
+    LOG.info("--- START: testKillRunningTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    killRunningTaskAttempt(getLastAttempt().getAttemptId());    
+  }
+
+  @Test 
+  public void testTaskProgress() {
+    LOG.info("--- START: testTaskProgress ---");
+        
+    // launch task
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    float progress = 0f;
+    assert(mockTask.getProgress() == progress);
+    launchTaskAttempt(getLastAttempt().getAttemptId());    
+    
+    // update attempt1 
+    progress = 50f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+    progress = 100f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+    
+    progress = 0f;
+    // mark first attempt as killed
+    updateLastAttemptState(TaskAttemptState.KILLED);
+    assert(mockTask.getProgress() == progress);
+
+    // kill first attempt 
+    // should trigger a new attempt
+    // as no successful attempts 
+    killRunningTaskAttempt(getLastAttempt().getAttemptId());
+    assert(taskAttempts.size() == 2);
+    
+    assert(mockTask.getProgress() == 0f);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    progress = 50f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+        
+  }
+  
+  @Test
+  public void testFailureDuringTaskAttemptCommit() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+
+    // During the task attempt commit there is an exception which causes
+    // the attempt to fail
+    updateLastAttemptState(TaskAttemptState.FAILED);
+    failRunningTaskAttempt(getLastAttempt().getAttemptId());
+
+    assertEquals(2, taskAttempts.size());
+    updateLastAttemptState(TaskAttemptState.SUCCEEDED);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    assertFalse("First attempt should not commit",
+        mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(getLastAttempt().getAttemptId()));
+
+    assertTaskSucceededState();
+  }
+  
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
+        TaskEventType.T_ATTEMPT_FAILED));
+    
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+    
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,412 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.launcher;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestContainerLauncher {
+
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  Configuration conf;
+  Server server;
+
+  static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
+
+  @Test
+  public void testPoolSize() throws InterruptedException {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+
+    AppContext context = mock(AppContext.class);
+    AMNodeMap nodes = new AMNodeMap(mock(EventHandler.class), context);
+    when(context.getAllNodes()).thenReturn(nodes);
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    containerLauncher.init(new Configuration());
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // No events yet
+    Assert.assertEquals(0, threadPool.getPoolSize());
+    Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE,
+      threadPool.getCorePoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Same set of hosts, so no change
+    containerLauncher.finishEventHandling = true;
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
+      LOG.info("Waiting for number of events processed to become " + 10
+          + ". It is now " + containerLauncher.numEventsProcessed.get()
+          + ". Timeout is " + timeOut);
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
+    containerLauncher.finishEventHandling = false;
+    for (int i = 0; i < 10; i++) {
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
+          i + 10);
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 20);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Different hosts, there should be an increase in core-thread-pool size to
+    // 21(11hosts+10buffer)
+    // Core pool size should be 21 but the live pool size should be only 11.
+    containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    containerLauncher.finishEventHandling = false;
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
+    NodeId nodeId = BuilderUtils.newNodeId("host11", 1234);
+    nodes.nodeSeen(nodeId);
+    containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId, null,
+        NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    waitForEvents(containerLauncher, 21);
+    Assert.assertEquals(11, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  @Test
+  public void testPoolLimits() throws InterruptedException {
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
+
+    AppContext context = mock(AppContext.class);
+    AMNodeMap nodes = new AMNodeMap(mock(EventHandler.class), context);
+    when(context.getAllNodes()).thenReturn(nodes);
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, 12);
+    containerLauncher.init(conf);
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // 10 different hosts
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // 4 more different hosts, but thread pool size should be capped at 12
+    containerLauncher.expectedCorePoolSize = 12 ;
+    for (int i = 1; i <= 4; i++) {
+      NodeId nodeId = BuilderUtils.newNodeId("host1" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 12);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Make some threads ideal so that remaining events are also done.
+    containerLauncher.finishEventHandling = true;
+    waitForEvents(containerLauncher, 14);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  private void waitForEvents(CustomContainerLauncher containerLauncher,
+      int expectedNumEvents) throws InterruptedException {
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
+        && timeOut++ < 20) {
+      LOG.info("Waiting for number of events to become " + expectedNumEvents
+          + ". It is now " + containerLauncher.numEventsProcessing.get());
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(expectedNumEvents,
+      containerLauncher.numEventsProcessing.get());
+  }
+
+  @Test
+  public void testSlowNM() throws Exception {
+    test();
+  }
+
+  private void test() throws Exception {
+
+    conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    // set timeout low for the test
+    conf.setInt("yarn.rpc.nm-command-timeout", 3000);
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    server = rpc.getServer(ContainerManager.class, new DummyContainerManager(),
+        addr, conf, null, 1);
+    server.start();
+
+    MRApp app = new MRAppWithSlowNM();
+
+    try {
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+      Assert.assertEquals("Num attempts is not correct", maxAttempts,
+          attempts.size());
+
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.START_WAIT);
+
+    app.waitForState(job, JobState.FAILED);
+
+    String diagnostics = attempt.getDiagnostics().toString();
+    LOG.info("attempt.getDiagnostics: " + diagnostics);
+
+      Assert.assertTrue(diagnostics.contains("Container launch failed for "
+          + "container_0_0000_01_000000 : "));
+      Assert
+          .assertTrue(diagnostics
+              .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel"));
+
+    } finally {
+      server.stop();
+    app.stop();
+  }
+  }
+
+  private final class CustomContainerLauncher extends ContainerLauncherImpl {
+
+    private volatile int expectedCorePoolSize = 0;
+    private AtomicInteger numEventsProcessing = new AtomicInteger(0);
+    private AtomicInteger numEventsProcessed = new AtomicInteger(0);
+    private volatile String foundErrors = null;
+    private volatile boolean finishEventHandling;
+
+    private CustomContainerLauncher(AppContext context) {
+      super(context);
+    }
+
+    public ThreadPoolExecutor getThreadPool() {
+      return super.launcherPool;
+    }
+
+    private final class CustomEventProcessor extends
+        ContainerLauncherImpl.EventProcessor {
+      private final NMCommunicatorEvent event;
+
+      private CustomEventProcessor(NMCommunicatorEvent event) {
+        super(event);
+        this.event = event;
+      }
+
+      @Override
+      public void run() {
+        // do nothing substantial
+
+        LOG.info("Processing the event " + event.toString());
+
+        numEventsProcessing.incrementAndGet();
+        // Stall
+        while (!finishEventHandling) {
+          synchronized (this) {
+            try {
+              wait(1000);
+            } catch (InterruptedException e) {
+              ;
+            }
+          }
+        }
+        numEventsProcessed.incrementAndGet();
+      }
+    }
+
+    protected ContainerLauncherImpl.EventProcessor createEventProcessor(
+        final NMCommunicatorEvent event) {
+      // At this point of time, the EventProcessor is being created and so no
+      // additional threads would have been created.
+
+      // Core-pool-size should have increased by now.
+      if (expectedCorePoolSize != launcherPool.getCorePoolSize()) {
+        foundErrors = "Expected " + expectedCorePoolSize + " but found "
+            + launcherPool.getCorePoolSize();
+      }
+
+      return new CustomEventProcessor(event);
+    }
+  }
+
+  private class MRAppWithSlowNM extends MRApp {
+
+    public MRAppWithSlowNM() {
+      super(1, 0, false, "TestContainerLauncher", true);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        protected ContainerManager getCMProxy(ContainerId containerID,
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          // make proxy connect to our local containerManager server
+          ContainerManager proxy = (ContainerManager) rpc.getProxy(
+              ContainerManager.class,
+              NetUtils.getConnectAddress(server), conf);
+          return proxy;
+        }
+      };
+
+    };
+  }
+
+  public class DummyContainerManager implements ContainerManager {
+
+    private ContainerStatus status = null;
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+      GetContainerStatusResponse response = recordFactory
+          .newRecordInstance(GetContainerStatusResponse.class);
+      response.setStatus(status);
+      return response;
+    }
+
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      ContainerLaunchContext container = request.getContainerLaunchContext();
+      StartContainerResponse response = recordFactory
+          .newRecordInstance(StartContainerResponse.class);
+      status = recordFactory.newRecordInstance(ContainerStatus.class);
+          try {
+        // make the thread sleep to look like its not going to respond
+        Thread.sleep(15000);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new UndeclaredThrowableException(e);
+            }
+      status.setState(ContainerState.RUNNING);
+      status.setContainerId(container.getContainerId());
+      status.setExitStatus(0);
+      return response;
+            }
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+      Exception e = new Exception("Dummy function", new Exception(
+          "Dummy function cause"));
+      throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
+          null).createYarnRemoteException(e);
+          }
+        }
+  }

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncherImpl.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,273 @@
+package org.apache.hadoop.mapreduce.v2.app2.launcher;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestContainerLauncherImpl {
+  static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
+  private static final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  
+  private static class ContainerLauncherImplUnderTest extends 
+    ContainerLauncherImpl {
+
+    private YarnRPC rpc;
+    
+    public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
+      super(context);
+      this.rpc = rpc;
+    }
+    
+    @Override
+    protected YarnRPC createYarnRPC(Configuration conf) {
+      return rpc;
+    }
+    
+    public void waitForPoolToIdle() throws InterruptedException {
+      //I wish that we did not need the sleep, but it is here so that we are sure
+      // That the other thread had time to insert the event into the queue and
+      // start processing it.  For some reason we were getting interrupted
+      // exceptions within eventQueue without this sleep.
+      Thread.sleep(100l);
+      LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+          " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+          " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+      while(!this.eventQueue.isEmpty() || 
+          !this.launcherPool.getQueue().isEmpty() || 
+          this.launcherPool.getActiveCount() > 0) {
+        Thread.sleep(100l);
+        LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+            " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+            " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+      }
+      LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
+          " POOL SIZE 2: "+this.launcherPool.getQueue().size()+
+          " ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
+    }
+  }
+  
+  public static ContainerId makeContainerId(long ts, int appId, int attemptId,
+      int id) {
+    return BuilderUtils.newContainerId(
+      BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(ts, appId), attemptId), id);
+  }
+
+  public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId, 
+      TaskType taskType, int id) {
+    ApplicationId aID = BuilderUtils.newApplicationId(ts, appId);
+    JobId jID = MRBuilderUtils.newJobId(aID, id);
+    TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType);
+    return MRBuilderUtils.newTaskAttemptId(tID, id);
+  }
+  
+  @Test
+  public void testHandle() throws Exception {
+    LOG.info("STARTING testHandle");
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+      
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM).startContainer(any(StartContainerRequest.class));
+      
+      LOG.info("inserting cleanup event");
+      NMCommunicatorStopRequestEvent mockCleanupEvent = mock(NMCommunicatorStopRequestEvent.class);
+      when(mockCleanupEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+      when(mockCleanupEvent.getContainerId()).thenReturn(contId);
+      when(mockCleanupEvent.getNodeId()).thenReturn(nodeId);
+      ut.handle(mockCleanupEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+    } finally {
+      ut.stop();
+    }
+  }
+  
+  @Test
+  public void testOutOfOrder() throws Exception {
+    LOG.info("STARTING testOutOfOrder");
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+
+      LOG.info("inserting cleanup event");
+      NMCommunicatorStopRequestEvent mockCleanupEvent = mock(NMCommunicatorStopRequestEvent.class);
+      when(mockCleanupEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+      when(mockCleanupEvent.getContainerId()).thenReturn(contId);
+      when(mockCleanupEvent.getNodeId()).thenReturn(nodeId);
+      ut.handle(mockCleanupEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+      
+      ut.waitForPoolToIdle();
+      
+      verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
+    } finally {
+      ut.stop();
+    }
+  }
+
+  @Test
+  public void testMyShutdown() throws Exception {
+    LOG.info("in test Shutdown");
+
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    @SuppressWarnings("rawtypes")
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+    AMNodeMap amNodeMap = new AMNodeMap(mockEventHandler, mockContext);
+    when(mockContext.getAllNodes()).thenReturn(amNodeMap);
+
+    ContainerManager mockCM = mock(ContainerManager.class);
+    when(mockRpc.getProxy(eq(ContainerManager.class),
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+
+    ContainerLauncherImplUnderTest ut =
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 8000);
+      amNodeMap.nodeSeen(nodeId);
+      StartContainerResponse startResp =
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+          ShuffleHandler.serializeMetaData(80));
+
+      LOG.info("inserting launch event");
+      NMCommunicatorLaunchRequestEvent mockLaunchEvent = mock(NMCommunicatorLaunchRequestEvent.class);
+      when(mockLaunchEvent.getType()).thenReturn(
+          NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+      when(mockLaunchEvent.getContainerId()).thenReturn(contId);
+      when(mockLaunchEvent.getNodeId()).thenReturn(nodeId);
+      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      ut.handle(mockLaunchEvent);
+
+      ut.waitForPoolToIdle();
+
+      verify(mockCM).startContainer(any(StartContainerRequest.class));
+
+      // skip cleanup and make sure stop kills the container
+
+    } finally {
+      ut.stop();
+      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+}
+  }
+}



Mime
View raw message