tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [34/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,463 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.InitTransition;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Tests various functions of the JobImpl class
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestJobImpl {
+  
+  @Test
+  public void testJobNoTasksTransition() { 
+    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Force checkJobCompleteSuccess to return null
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    mockJob.tasks = tasks;
+
+    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
+    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
+        JobStateInternal.ERROR, state);
+  }
+
+  @Test
+  public void testCommitJobFailsJob() {
+
+    JobImpl mockJob = mock(JobImpl.class);
+    mockJob.tasks = new HashMap<TaskId, Task>();
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(JobStateInternal.KILLED);
+    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(JobStateInternal.FAILED);
+    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(JobStateInternal.SUCCEEDED);
+
+    try {
+      doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobStateInternal.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccess() {
+    
+    JobImpl mockJob = mock(JobImpl.class);
+    mockJob.tasks = new HashMap<TaskId, Task>();
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+    
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(JobStateInternal.SUCCEEDED);
+
+    try {
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccessFailed() {
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Make the completedTasks not equal the getTasks()
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    mockJob.tasks = tasks;
+    
+    try {
+      // Just in case the code breaks and reaches these calls
+      OutputCommitter mockCommitter = mock(OutputCommitter.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    } catch (IOException e) {
+      e.printStackTrace();    
+    }
+    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
+      "for unsuccessful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    TestJobImpl t = new TestJobImpl();
+    t.testJobNoTasksTransition();
+    t.testCheckJobCompleteSuccess();
+    t.testCheckJobCompleteSuccessFailed();
+    t.testCheckAccess();
+  }
+
+  @Test
+  public void testCheckAccess() {
+    // Create two unique users
+    String user1 = System.getProperty("user.name");
+    String user2 = user1 + "1234";
+    UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1);
+    UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2);
+
+    // Create the job
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+
+    // Setup configuration access only to user1 (owner)
+    Configuration conf1 = new Configuration();
+    conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access to the user1 (owner) and user2
+    Configuration conf2 = new Configuration();
+    conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2);
+
+    // Verify access
+    JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access with security enabled and access to all
+    Configuration conf3 = new Configuration();
+    conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*");
+
+    // Verify access
+    JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf4 = new Configuration();
+    conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf5 = new Configuration();
+    conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job5.checkAccess(ugi1, null));
+    Assert.assertTrue(job5.checkAccess(ugi2, null));
+  }
+  @Test
+  public void testUberDecision() throws Exception {
+
+    // with default values, no of maps is 2
+    Configuration conf = new Configuration();
+    boolean isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 1
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2 and uber task max maps is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+  }
+  
+  @Test
+  public void testReportedAppProgress() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    
+    int numMaps = 10;
+    int numReduces = 10;
+    int numTasks = numMaps + numReduces;
+    Configuration conf = new Configuration();
+    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+        appAttemptId, 0), numMaps, numReduces, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
+    Job job = mrApp.submit(conf);
+    DrainDispatcher dispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    
+    mrApp.waitForState(job, JobState.RUNNING);
+    
+    // Empty the queue. All Attempts in RUNNING state.
+    // Using waitForState can be slow.
+
+    dispatcher.await();
+    // At this point, setup is complete. Tasks may be running.
+    float expected = 0.05f;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    Iterator<Task> it = job.getTasks().values().iterator();
+
+    // finish 1 map.
+    int toFinish = 1;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+      
+    // finish 7 more maps.
+    toFinish = 7;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish remaining 2 maps.
+    toFinish = 2;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish 2 reduces
+    toFinish = 2;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    // finish remaining 8 reduces.
+    toFinish = 8;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    mrApp.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  @Test
+  // Refer to comments for the previous test.
+  public void testReportedAppProgressWithOnlyMaps() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+    
+    int numMaps = 10;
+    int numReduces = 0;
+    int numTasks = numMaps + numReduces;
+    Configuration conf = new Configuration();
+    MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(
+        appAttemptId, 0), numMaps, numReduces, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+    };
+    Job job = mrApp.submit(conf);
+    DrainDispatcher dispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    
+    mrApp.waitForState(job, JobState.RUNNING);
+    
+    // Empty the queue. All Attempts in RUNNING state.
+    // Using waitForState can be slow.
+
+    dispatcher.await();
+    // At this point, setup is complete. Tasks may be running.
+    float expected = 0.05f;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    
+    Iterator<Task> it = job.getTasks().values().iterator();
+
+    // finish 1 map.
+    int toFinish = 1;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+
+    // finish 5 more maps.
+    toFinish = 5;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+
+    // finish the rest.
+    toFinish = 4;
+    finishNextNTasks(mrApp, it, toFinish, dispatcher);
+    expected += toFinish * 0.9/numTasks;
+    Assert.assertEquals(expected, job.getProgress(), 0.001f);
+    // TODO This last verification should've been a race. Since the AM never
+    // goes beyond 0.95f, this is ok for now.
+    
+    // TODO. Ideally MRApp should be provide a way to signal job completion.
+    // i.e. Do not auto complete a job after all tasks completed. Use in prev
+    // test as well. 
+    mrApp.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int n,
+      DrainDispatcher dispatcher) throws Exception {
+    finishNextNTasks(mrApp, it, n);
+    dispatcher.await();
+  }
+  
+  private void finishNextNTasks(MRApp mrApp, Iterator<Task> it, int n)
+      throws Exception {
+    for (int i = 0; i < n; i++) {
+      if (!it.hasNext()) {
+        throw new RuntimeException("Attempt to finish a non-existing task");
+      }
+      Task task = it.next();
+      finishTask(mrApp, task);
+    }
+  }
+
+  private void finishTask(MRApp mrApp, Task task) throws Exception {
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    mrApp.sendFinishToTaskAttempt(attempt.getID(), TaskAttemptState.SUCCEEDED,
+        false);
+  }
+  private boolean testUberDecision(Configuration conf) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null, null, null,
+        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null, null);
+    InitTransition initTransition = getInitTransition();
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    initTransition.transition(job, mockJobEvent);
+    boolean isUber = job.isUber();
+    return isUber;
+  }
+
+  private InitTransition getInitTransition() {
+    InitTransition initTransition = new InitTransition() {
+      @Override
+      protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
+            new TaskSplitMetaInfo() };
+      }
+    };
+    return initTransition;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestMapReduceChildJVM.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.junit.Test;
+
+public class TestMapReduceChildJVM {
+
+  private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
+
+  @Test
+  public void testCommandLine() throws Exception {
+
+    MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    Assert.assertEquals(
+      "[exec $JAVA_HOME/bin/java" +
+      " -Djava.net.preferIPv4Stack=true" +
+      " -Dhadoop.metrics.log.level=WARN" +
+      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      " -Dlog4j.configuration=container-log4j.properties" +
+      " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
+      " -Dyarn.app.mapreduce.container.log.filesize=0" +
+      " -Dhadoop.root.logger=INFO,CLA" +
+      " org.apache.tez.mapreduce.task.impl.YarnTezChild 127.0.0.1" +
+      " 54321" +
+      " job_0_0000" +
+      " MAP" +
+      " container_0_0000_01_000000" +
+      " 1><LOG_DIR>/stdout" +
+      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+  }
+
+  private static final class MyMRApp extends MRApp {
+
+    private String myCommandLine;
+
+    public MyMRApp(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new MockContainerLauncher() {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+          if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+            NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+            ContainerLaunchContext launchContext = launchEvent.getContainerLaunchContext();
+            String cmdString = launchContext.getCommands().toString();
+            LOG.info("launchContext " + cmdString);
+            myCommandLine = cmdString;
+          }
+          super.handle(event);
+        }
+      };
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskAttempt.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl2;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminated;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventContainerTerminating;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventSchedule;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.mapreduce.task.MapOnlyTask;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestTaskAttempt{
+
+  // TODO TEZAM4 This may need to be test specific.
+  private static final String MAP_ONLY_TASK_CLASS_NAME = MapOnlyTask.class.getName();
+  
+  static public class StubbedFS extends RawLocalFileSystem {
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return new FileStatus(1, false, 1, 1, 1, f);
+    }
+  }
+
+  @Test
+  public void testMRAppHistoryForMap() throws Exception {
+    MRApp app = new FailingAttemptsMRApp(1, 0);
+    testMRAppHistory(app);
+  }
+
+  @Test
+  public void testMRAppHistoryForReduce() throws Exception {
+    MRApp app = new FailingAttemptsMRApp(0, 1);
+    testMRAppHistory(app);
+  }
+
+  @Test
+  public void testSingleRackRequest() throws Exception {
+    TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+        new TaskAttemptImpl.ScheduleTaskattemptTransition();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String[] hosts = new String[3];
+    hosts[0] = "host1";
+    hosts[1] = "host2";
+    hosts[2] = "host3";
+    TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0,
+        128 * 1024 * 1024l);
+
+    TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest(
+        eventHandler, splitInfo);
+    TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class);
+    doReturn(false).when(mockTAEvent).isRescheduled();
+
+    sta.transition(mockTaskAttempt, mockTAEvent);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    if (!(arg.getAllValues().get(1) instanceof AMSchedulerTALaunchRequestEvent)) {
+      Assert.fail("Second Event not of type ContainerRequestEvent");
+    }
+    AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent) arg
+        .getAllValues().get(1);
+    String[] requestedRacks = tlrE.getRacks();
+    // Only a single occurrence of /DefaultRack
+    assertEquals(1, requestedRacks.length);
+  }
+
+  @Test
+  public void testHostResolveAttempt() throws Exception {
+    TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+        new TaskAttemptImpl.ScheduleTaskattemptTransition();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String hosts[] = new String[] {"192.168.1.1", "host2", "host3"};
+    String resolved[] = new String[] {"host1", "host2", "host3"};
+    TaskSplitMetaInfo splitInfo =
+        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+    TaskAttemptImpl mockTaskAttempt =
+        createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo);
+    TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+    when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
+
+    TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class);
+    doReturn(false).when(mockTAEvent).isRescheduled();
+
+    sta.transition(spyTa, mockTAEvent);
+    verify(spyTa).resolveHosts(hosts);
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    if (!(arg.getAllValues().get(1) instanceof AMSchedulerTALaunchRequestEvent)) {
+      Assert.fail("Second Event not of type ContainerRequestEvent");
+    }
+    Map<String, Boolean> expected = new HashMap<String, Boolean>();
+    expected.put("host1", true);
+    expected.put("host2", true);
+    expected.put("host3", true);
+    AMSchedulerTALaunchRequestEvent cre =
+        (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1);
+    String[] requestedHosts = cre.getHosts();
+    for (String h : requestedHosts) {
+      expected.remove(h);
+    }
+    assertEquals(0, expected.size());
+  }
+  
+  @Test
+  public void testSlotMillisCounterUpdate() throws Exception {
+    verifySlotMillis(2048, 2048, 1024);
+    verifySlotMillis(2048, 1024, 1024);
+    verifySlotMillis(10240, 1024, 2048);
+  }
+
+  public void verifySlotMillis(int mapMemMb, int reduceMemMb,
+      int minContainerSize) throws Exception {
+    Clock actualClock = new SystemClock();
+    ControlledClock clock = new ControlledClock(actualClock);
+    clock.setTime(10);
+    MRApp app =
+        new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
+    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
+    app.setClusterInfo(new ClusterInfo(BuilderUtils
+        .newResource(minContainerSize, 1), BuilderUtils.newResource(10240,1)));
+
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
+    Iterator<Task> taskIter = tasks.values().iterator();
+    Task mTask = taskIter.next();
+    app.waitForState(mTask, TaskState.RUNNING);
+    Task rTask = taskIter.next();
+    app.waitForState(rTask, TaskState.RUNNING);
+    Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
+    Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
+    TaskAttempt mta = mAttempts.values().iterator().next();
+    TaskAttempt rta = rAttempts.values().iterator().next();
+    app.waitForState(mta, TaskAttemptState.RUNNING);
+    app.waitForState(rta, TaskAttemptState.RUNNING);
+
+    clock.setTime(11);
+    app.getContext()
+        .getEventHandler()
+        .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
+    app.getContext()
+        .getEventHandler()
+        .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
+    app.waitForState(job, JobState.SUCCEEDED);
+    Assert.assertEquals(mta.getFinishTime(), 11);
+    Assert.assertEquals(mta.getLaunchTime(), 10);
+    Assert.assertEquals(rta.getFinishTime(), 11);
+    Assert.assertEquals(rta.getLaunchTime(), 10);
+    Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
+        job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
+            .getValue());
+    Assert.assertEquals(
+        (int) Math.ceil((float) reduceMemMb / minContainerSize), job
+            .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
+            .getValue());
+  }
+  
+  private TaskAttemptImpl createMapTaskAttemptImpl2ForTest(
+      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
+    Clock clock = new SystemClock();
+    return createMapTaskAttemptImpl2ForTest(eventHandler, taskSplitMetaInfo, clock);
+  }
+  
+  private TaskAttemptImpl createMapTaskAttemptImpl2ForTest(
+      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    Path jobFile = mock(Path.class);
+    JobConf jobConf = new JobConf();
+    OutputCommitter outputCommitter = mock(OutputCommitter.class);
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl2(taskId, 1, eventHandler, jobFile, 1,
+            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+            null, clock, mock(TaskHeartbeatHandler.class), null,
+            MAP_ONLY_TASK_CLASS_NAME);
+    return taImpl;
+  }
+
+  private void testMRAppHistory(MRApp app) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskId, Task> tasks = job.getTasks();
+
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.FAILED, task
+        .getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
+        .getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
+
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    TaskAttemptReport report = it.next().getReport();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+        report.getTaskAttemptState());
+    Assert.assertEquals("Diagnostic Information is not Correct",
+        "Test Diagnostic Event", report.getDiagnosticInfo());
+    report = it.next().getReport();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+        report.getTaskAttemptState());
+  }
+
+  static class FailingAttemptsMRApp extends MRApp {
+    FailingAttemptsMRApp(int maps, int reduces) {
+      super(maps, reduces, true, "FailingAttemptsMRApp", true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEventFailRequest(attemptID, "Test Diagnostic Event"));
+    }
+
+    // TODO This will execute in a separate thread. The assert is not very useful.
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new EventHandler<JobHistoryEvent>() {
+        @Override
+        public void handle(JobHistoryEvent event) {
+          if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
+            TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
+                .getHistoryEvent().getDatum();
+            // TODO Useless assert in a separate thread. Caues test to hang.
+            Assert.assertEquals("Diagnostic Information is not Correct",
+                "Test Diagnostic Event", datum.get(8).toString());
+          }
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testLaunchFailedWhileKilling() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = 
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+    
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+    
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+    
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+    
+    AppContext mockAppContext = mock(AppContext.class);
+    doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
+    
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl2(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext,
+          MAP_ONLY_TASK_CLASS_NAME);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    
+    taImpl.handle(new TaskAttemptEventSchedule(attemptId, false));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventKillRequest(attemptId, null));
+    // At some KILLING state.
+    taImpl.handle(new TaskAttemptEventContainerTerminating(attemptId, null));
+    assertFalse(eventHandler.internalError);
+  }
+
+  // TODO Add a similar test for TERMINATING.
+  @Test
+  public void testContainerTerminatedWhileRunning() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, MAP_ONLY_TASK_CLASS_NAME);
+
+    taImpl.handle(new TaskAttemptEventSchedule(attemptId, false));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(attemptId, contId, null, -1));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEventContainerTerminated(attemptId, null));
+    assertFalse(
+        "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
+        eventHandler.internalError);
+  }
+
+  @Test
+  public void testContainerTerminatedWhileCommitting() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, MAP_ONLY_TASK_CLASS_NAME);
+
+    taImpl.handle(new TaskAttemptEventSchedule(attemptId, false));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(attemptId, contId, null, -1));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+    assertEquals("Task attempt is not in commit pending state",
+        taImpl.getState(), TaskAttemptState.COMMIT_PENDING);
+    taImpl.handle(new TaskAttemptEventContainerTerminated(attemptId, null));
+    assertFalse(
+        "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
+        eventHandler.internalError);
+  }
+
+  @Test
+  public void testMultipleTooManyFetchFailures() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl2(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, MAP_ONLY_TASK_CLASS_NAME);
+
+    taImpl.handle(new TaskAttemptEventSchedule(attemptId, false));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(attemptId, contId, null, -1));
+    taImpl
+        .handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+    assertEquals("Task attempt is not in FAILED state, still",
+        taImpl.getState(), TaskAttemptState.FAILED);
+    assertFalse("InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES",
+        eventHandler.internalError);
+  }
+
+  public static class MockEventHandler implements EventHandler {
+    public boolean internalError;
+    
+    @Override
+    public void handle(Event event) {
+      if (event instanceof JobEvent) {
+        JobEvent je = ((JobEvent) event);
+        if (JobEventType.INTERNAL_ERROR == je.getType()) {
+          internalError = true;
+        }
+      }
+    }
+    
+  };
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestTaskImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventTAUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.task.MapOnlyTask;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("rawtypes")
+public class TestTaskImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);    
+  
+  private JobConf conf;
+  private TaskAttemptListener taskAttemptListener;
+  private TaskHeartbeatHandler taskHeartbeatHandler;
+  private OutputCommitter committer;
+  private Token<JobTokenIdentifier> jobToken;
+  private JobId jobId;
+  private Path remoteJobConfFile;
+  private Credentials credentials;
+  private Clock clock;
+  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
+  private MRAppMetrics metrics;
+  private TaskImpl mockTask;
+  private ApplicationId appId;
+  private TaskSplitMetaInfo taskSplitMetaInfo;  
+  private String[] dataLocations = new String[0]; 
+  private final TaskType taskType = TaskType.MAP;
+  private AppContext appContext;
+  
+  private int startCount = 0;
+  private int taskCounter = 0;
+  private final int partition = 1;
+  
+  private InlineDispatcher dispatcher;   
+  private List<MockTaskAttemptImpl> taskAttempts;
+  
+  private class MockTaskImpl extends TaskImpl {
+        
+    private int taskAttemptCounter = 0;
+
+    public MockTaskImpl(JobId jobId, int partition,
+        EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
+        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+        Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock,
+        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+        MRAppMetrics metrics, TaskHeartbeatHandler thh, AppContext appContext,
+        String tezModuleClassName) {
+      super(jobId, taskType , partition, eventHandler,
+          remoteJobConfFile, conf, taskAttemptListener, committer, 
+          jobToken, credentials, clock,
+          completedTasksFromPreviousRun, startCount, metrics, thh, appContext,
+          tezModuleClassName);
+    }
+
+    @Override
+    public TaskType getType() {
+      return taskType;
+    }
+
+    @Override
+    protected TaskAttemptImpl createAttempt() {
+      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
+          eventHandler, taskAttemptListener, remoteJobConfFile, partition,
+          conf, committer, jobToken, credentials, clock, taskHeartbeatHandler, 
+          appContext, tezModuleClassName);
+      taskAttempts.add(attempt);
+      return attempt;
+    }
+
+    @Override
+    protected int getMaxAttempts() {
+      return 100;
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
+  }
+  
+  private class MockTaskAttemptImpl extends TaskAttemptImpl {
+
+    private float progress = 0;
+    private TaskAttemptState state = TaskAttemptState.NEW;
+    private TaskAttemptId attemptId;
+
+    public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
+        JobConf conf, OutputCommitter committer,
+        Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock,
+        TaskHeartbeatHandler thh, AppContext appContext,
+        String tezModuleClassName) {
+      super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
+          dataLocations, committer, jobToken, credentials, clock, thh,
+          appContext, tezModuleClassName);
+
+      attemptId = Records.newRecord(TaskAttemptId.class);
+      attemptId.setId(id);
+      attemptId.setTaskId(taskId);
+    }
+
+    public TaskAttemptId getAttemptId() {
+      return attemptId;
+    }
+    
+    @Override
+    protected MRTaskContext createRemoteMRTaskContext() {
+      // TODO TEZAM3
+      return null;
+    }    
+    
+    public float getProgress() {
+      return progress ;
+    }
+    
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+    
+    public void setState(TaskAttemptState state) {
+      this.state = state;
+    }
+    
+    public TaskAttemptState getState() {
+      return state;
+    }
+    
+  }
+
+  // TODO EVENTUALLY Fix this, with the rewired TaskContext and actual Processor.
+  private class MockTask extends Task {
+
+    @Override
+    public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+        throws IOException, ClassNotFoundException, InterruptedException {
+      return;
+    }
+
+    @Override
+    public boolean isMapTask() {
+      return true;
+    }    
+    
+  }
+  
+  @Before 
+  @SuppressWarnings("unchecked")
+  public void setup() {
+     dispatcher = new InlineDispatcher();
+    
+    ++startCount;
+    
+    conf = new JobConf();
+    taskAttemptListener = mock(TaskAttemptListener.class);
+    taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    committer = mock(OutputCommitter.class);
+    jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+    remoteJobConfFile = mock(Path.class);
+    credentials = null;
+    clock = new SystemClock();
+    metrics = mock(MRAppMetrics.class);  
+    dataLocations = new String[1];
+    
+    appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(System.currentTimeMillis());
+    appId.setId(1);
+
+    jobId = Records.newRecord(JobId.class);
+    jobId.setId(1);
+    jobId.setAppId(appId);
+    appContext = mock(AppContext.class);
+
+    taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+    when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
+    
+    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+    
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, taskHeartbeatHandler, appContext, MapOnlyTask.class.getName());
+    // TODO TEZAM4 Fix this - should likely not be hardcoded to MapOnlyTask.
+    // Depends on what the unit test is doing.
+    
+  }
+
+  @After 
+  public void teardown() {
+    taskAttempts.clear();
+  }
+  
+  private TaskId getNewTaskID() {
+    TaskId taskId = Records.newRecord(TaskId.class);
+    taskId.setId(++taskCounter);
+    taskId.setJobId(jobId);
+    taskId.setTaskType(mockTask.getType());    
+    return taskId;
+  }
+  
+  private void scheduleTaskAttempt(TaskId taskId) {
+    mockTask.handle(new TaskEvent(taskId, 
+        TaskEventType.T_SCHEDULE));
+    assertTaskScheduledState();
+  }
+  
+  private void killTask(TaskId taskId) {
+    mockTask.handle(new TaskEvent(taskId, 
+        TaskEventType.T_KILL));
+    assertTaskKillWaitState();
+  }
+  
+  private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId, 
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskScheduledState();
+  }
+
+  private void launchTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId, 
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    assertTaskRunningState();    
+  }
+  
+  private void commitTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId, 
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertTaskRunningState();    
+  }
+  
+  private MockTaskAttemptImpl getLastAttempt() {
+    return taskAttempts.get(taskAttempts.size()-1);
+  }
+  
+  private void updateLastAttemptProgress(float p) {    
+    getLastAttempt().setProgress(p);
+  }
+
+  private void updateLastAttemptState(TaskAttemptState s) {
+    getLastAttempt().setState(s);
+  }
+  
+  private void killRunningTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId, 
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskRunningState();  
+  }
+  
+  private void failRunningTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId, 
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertTaskRunningState();
+  }
+  
+  /**
+   * {@link TaskState#NEW}
+   */
+  private void assertTaskNewState() {
+    assertEquals(TaskState.NEW, mockTask.getState());
+  }
+  
+  /**
+   * {@link TaskState#SCHEDULED}
+   */
+  private void assertTaskScheduledState() {
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+  }
+
+  /**
+   * {@link TaskState#RUNNING}
+   */
+  private void assertTaskRunningState() {
+    assertEquals(TaskState.RUNNING, mockTask.getState());
+  }
+    
+  /**
+   * {@link TaskState#KILL_WAIT}
+   */
+  private void assertTaskKillWaitState() {
+    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
+  }
+  
+  /**
+   * {@link TaskState#SUCCEEDED}
+   */
+  private void assertTaskSucceededState() {
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+  }
+  
+  @Test
+  public void testInit() {
+    LOG.info("--- START: testInit ---");
+    assertTaskNewState();
+    assert(taskAttempts.size() == 0);
+  }
+
+  @Test
+  /**
+   * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
+   */
+  public void testScheduleTask() {
+    LOG.info("--- START: testScheduleTask ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+  }
+  
+  @Test 
+  /**
+   * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
+   */
+  public void testKillScheduledTask() {
+    LOG.info("--- START: testKillScheduledTask ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killTask(taskId);
+  }
+  
+  @Test 
+  /**
+   * Kill attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
+   */
+  public void testKillScheduledTaskAttempt() {
+    LOG.info("--- START: testKillScheduledTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killScheduledTaskAttempt(getLastAttempt().getAttemptId());
+  }
+  
+  @Test 
+  /**
+   * Launch attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
+   */
+  public void testLaunchTaskAttempt() {
+    LOG.info("--- START: testLaunchTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+  }
+
+  @Test
+  /**
+   * Kill running attempt
+   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 
+   */
+  public void testKillRunningTaskAttempt() {
+    LOG.info("--- START: testKillRunningTaskAttempt ---");
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    killRunningTaskAttempt(getLastAttempt().getAttemptId());    
+  }
+
+  @Test 
+  public void testTaskProgress() {
+    LOG.info("--- START: testTaskProgress ---");
+        
+    // launch task
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    float progress = 0f;
+    assert(mockTask.getProgress() == progress);
+    launchTaskAttempt(getLastAttempt().getAttemptId());    
+    
+    // update attempt1 
+    progress = 50f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+    progress = 100f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+    
+    progress = 0f;
+    // mark first attempt as killed
+    updateLastAttemptState(TaskAttemptState.KILLED);
+    assert(mockTask.getProgress() == progress);
+
+    // kill first attempt 
+    // should trigger a new attempt
+    // as no successful attempts 
+    killRunningTaskAttempt(getLastAttempt().getAttemptId());
+    assert(taskAttempts.size() == 2);
+    
+    assert(mockTask.getProgress() == 0f);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    progress = 50f;
+    updateLastAttemptProgress(progress);
+    assert(mockTask.getProgress() == progress);
+        
+  }
+  
+  @Test
+  public void testFailureDuringTaskAttemptCommit() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+
+    // During the task attempt commit there is an exception which causes
+    // the attempt to fail
+    updateLastAttemptState(TaskAttemptState.FAILED);
+    failRunningTaskAttempt(getLastAttempt().getAttemptId());
+
+    assertEquals(2, taskAttempts.size());
+    updateLastAttemptState(TaskAttemptState.SUCCEEDED);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskEventTAUpdate(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    assertFalse("First attempt should not commit",
+        mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(getLastAttempt().getAttemptId()));
+
+    assertTaskSucceededState();
+  }
+  
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskEventTAUpdate(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskEventTAUpdate(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskEventTAUpdate(taskAttempts.get(0).getAttemptId(), 
+        TaskEventType.T_ATTEMPT_FAILED));
+    
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+    
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/launcher/TestContainerLauncher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,415 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.launcher;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRApp;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestContainerLauncher {
+
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  Configuration conf;
+  Server server;
+
+  static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
+
+  @Test
+  public void testPoolSize() throws InterruptedException {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+
+    AppContext context = mock(AppContext.class);
+    AMNodeMap nodes = new AMNodeMap(mock(EventHandler.class), context);
+    doReturn(nodes).when(context).getAllNodes();
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    containerLauncher.init(new Configuration());
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // No events yet
+    Assert.assertEquals(0, threadPool.getPoolSize());
+    Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE,
+      threadPool.getCorePoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Same set of hosts, so no change
+    containerLauncher.finishEventHandling = true;
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
+      LOG.info("Waiting for number of events processed to become " + 10
+          + ". It is now " + containerLauncher.numEventsProcessed.get()
+          + ". Timeout is " + timeOut);
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
+    containerLauncher.finishEventHandling = false;
+    for (int i = 0; i < 10; i++) {
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId,
+          i + 10);
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 20);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Different hosts, there should be an increase in core-thread-pool size to
+    // 21(11hosts+10buffer)
+    // Core pool size should be 21 but the live pool size should be only 11.
+    containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    containerLauncher.finishEventHandling = false;
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
+    NodeId nodeId = BuilderUtils.newNodeId("host11", 1234);
+    nodes.nodeSeen(nodeId);
+    containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId, null,
+        NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    waitForEvents(containerLauncher, 21);
+    Assert.assertEquals(11, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  @Test
+  public void testPoolLimits() throws InterruptedException {
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
+
+    AppContext context = mock(AppContext.class);
+    AMNodeMap nodes = new AMNodeMap(mock(EventHandler.class), context);
+    when(context.getAllNodes()).thenReturn(nodes);
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, 12);
+    containerLauncher.init(conf);
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // 10 different hosts
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      NodeId nodeId = BuilderUtils.newNodeId("host" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // 4 more different hosts, but thread pool size should be capped at 12
+    containerLauncher.expectedCorePoolSize = 12 ;
+    for (int i = 1; i <= 4; i++) {
+      NodeId nodeId = BuilderUtils.newNodeId("host1" + i, 1234);
+      nodes.nodeSeen(nodeId);
+      containerLauncher.handle(new NMCommunicatorEvent(containerId, nodeId,
+          null, NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST));
+    }
+    waitForEvents(containerLauncher, 12);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Make some threads ideal so that remaining events are also done.
+    containerLauncher.finishEventHandling = true;
+    waitForEvents(containerLauncher, 14);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  private void waitForEvents(CustomContainerLauncher containerLauncher,
+      int expectedNumEvents) throws InterruptedException {
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
+        && timeOut++ < 20) {
+      LOG.info("Waiting for number of events to become " + expectedNumEvents
+          + ". It is now " + containerLauncher.numEventsProcessing.get());
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(expectedNumEvents,
+      containerLauncher.numEventsProcessing.get());
+  }
+
+  @Test
+  public void testSlowNM() throws Exception {
+    test();
+  }
+
+  private void test() throws Exception {
+
+    conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    // set timeout low for the test
+    conf.setInt("yarn.rpc.nm-command-timeout", 3000);
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    server = rpc.getServer(ContainerManager.class, new DummyContainerManager(),
+        addr, conf, null, 1);
+    server.start();
+
+    MRApp app = new MRAppWithSlowNM();
+
+    try {
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+      Assert.assertEquals("Num attempts is not correct", maxAttempts,
+          attempts.size());
+
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.START_WAIT);
+
+    app.waitForState(job, JobState.FAILED);
+
+    String diagnostics = attempt.getDiagnostics().toString();
+    LOG.info("attempt.getDiagnostics: " + diagnostics);
+
+      Assert.assertTrue(diagnostics.contains("Container launch failed for "
+          + "container_0_0000_01_000000 : "));
+      Assert
+          .assertTrue(diagnostics
+              .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel"));
+
+    } finally {
+      server.stop();
+    app.stop();
+  }
+  }
+
+  private final class CustomContainerLauncher extends ContainerLauncherImpl {
+
+    private volatile int expectedCorePoolSize = 0;
+    private AtomicInteger numEventsProcessing = new AtomicInteger(0);
+    private AtomicInteger numEventsProcessed = new AtomicInteger(0);
+    private volatile String foundErrors = null;
+    private volatile boolean finishEventHandling;
+
+    private CustomContainerLauncher(AppContext context) {
+      super(context);
+    }
+
+    public ThreadPoolExecutor getThreadPool() {
+      return super.launcherPool;
+    }
+
+    private final class CustomEventProcessor extends
+        ContainerLauncherImpl.EventProcessor {
+      private final NMCommunicatorEvent event;
+
+      private CustomEventProcessor(NMCommunicatorEvent event) {
+        super(event);
+        this.event = event;
+      }
+
+      @Override
+      public void run() {
+        // do nothing substantial
+
+        LOG.info("Processing the event " + event.toString());
+
+        numEventsProcessing.incrementAndGet();
+        // Stall
+        while (!finishEventHandling) {
+          synchronized (this) {
+            try {
+              wait(1000);
+            } catch (InterruptedException e) {
+              ;
+            }
+          }
+        }
+        numEventsProcessed.incrementAndGet();
+      }
+    }
+
+    protected ContainerLauncherImpl.EventProcessor createEventProcessor(
+        final NMCommunicatorEvent event) {
+      // At this point of time, the EventProcessor is being created and so no
+      // additional threads would have been created.
+
+      // Core-pool-size should have increased by now.
+      if (expectedCorePoolSize != launcherPool.getCorePoolSize()) {
+        foundErrors = "Expected " + expectedCorePoolSize + " but found "
+            + launcherPool.getCorePoolSize();
+      }
+
+      return new CustomEventProcessor(event);
+    }
+  }
+
+  private class MRAppWithSlowNM extends MRApp {
+
+    public MRAppWithSlowNM() {
+      super(1, 0, false, "TestContainerLauncher", true);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        protected ContainerManager getCMProxy(ContainerId containerID,
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          // make proxy connect to our local containerManager server
+          ContainerManager proxy = (ContainerManager) rpc.getProxy(
+              ContainerManager.class,
+              NetUtils.getConnectAddress(server), conf);
+          return proxy;
+        }
+      };
+
+    };
+  }
+
+  public class DummyContainerManager implements ContainerManager {
+
+    private ContainerStatus status = null;
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+      GetContainerStatusResponse response = recordFactory
+          .newRecordInstance(GetContainerStatusResponse.class);
+      response.setStatus(status);
+      return response;
+    }
+
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      ContainerLaunchContext container = request.getContainerLaunchContext();
+      StartContainerResponse response = recordFactory
+          .newRecordInstance(StartContainerResponse.class);
+      status = recordFactory.newRecordInstance(ContainerStatus.class);
+          try {
+        // make the thread sleep to look like its not going to respond
+        Thread.sleep(15000);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new UndeclaredThrowableException(e);
+            }
+      status.setState(ContainerState.RUNNING);
+      status.setContainerId(container.getContainerId());
+      status.setExitStatus(0);
+      return response;
+            }
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+      Exception e = new Exception("Dummy function", new Exception(
+          "Dummy function cause"));
+      throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
+          null).createYarnRemoteException(e);
+          }
+        }
+  }



Mime
View raw message