tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [33/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,777 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchFailed;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+  private static Path outputDir = new Path(new File("target", 
+      TestRecovery.class.getName()).getAbsolutePath() + 
+      Path.SEPARATOR + "out");
+  private static String partFile = "part-r-00000";
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
+  /**
+   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+   * completely disappears because of failed launch, one attempt gets killed and
+   * one attempt succeeds. AM crashes after the first tasks finishes and
+   * recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashed() throws Exception {
+
+    int runCount = 0;
+    long am1StartTimeEst = System.currentTimeMillis();
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    long jobStartTime = job.getReport().getStartTime();
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    /////////// Play some games with the TaskAttempts of the first task //////
+    //send the fail signal to the 1st map task attempt
+    app.getContext().getEventHandler()
+        .handle(new TaskAttemptEventFailRequest(task1Attempt1.getID(), null));
+    
+    app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
+
+    int timeOut = 0;
+    while (mapTask1.getAttempts().size() < 2 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    // Additional attempts run in a separate thread. Possible for the 3rd
+    // atetmpt to be crated before this check.
+    Assert.assertTrue(mapTask1.getAttempts().size() >=2);
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    TaskAttempt task1Attempt2 = itr.next();
+    
+    // This attempt will automatically fail because of the way ContainerLauncher
+    // is setup
+    // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
+
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEventTerminated(task1Attempt2.getID()));
+    app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
+
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Assert.assertEquals(3, mapTask1.getAttempts().size());
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt3 = itr.next();
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+    //send the kill signal to the 1st map 3rd attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt3.getID(),
+            TaskAttemptEventType.TA_KILL_REQUEST));
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
+
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Assert.assertEquals(4, mapTask1.getAttempts().size());
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt4 = itr.next();
+    
+    app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 1st map 4th attempt
+    app.sendFinishToTaskAttempt(task1Attempt4.getID(),
+        TaskAttemptState.SUCCEEDED, true);
+
+    /////////// End of games with the TaskAttempts of the first task //////
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    long task1StartTime = mapTask1.getReport().getStartTime();
+    long task1FinishTime = mapTask1.getReport().getFinishTime();
+    
+    //stop the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    long am2StartTimeEst = System.currentTimeMillis();
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+    
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals("Job Start time not correct",
+        jobStartTime, job.getReport().getStartTime());
+    Assert.assertEquals("Task Start time not correct",
+        task1StartTime, mapTask1.getReport().getStartTime());
+    Assert.assertEquals("Task Finish time not correct",
+        task1FinishTime, mapTask1.getReport().getFinishTime());
+    Assert.assertEquals(2, job.getAMInfos().size());
+    int attemptNum = 1;
+    // Verify AMInfo
+    for (AMInfo amInfo : job.getAMInfos()) {
+      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+          .getAttemptId());
+      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+          .getApplicationAttemptId());
+      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    }
+    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+        && am1StartTimeReal <= am2StartTimeEst);
+    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+        && am2StartTimeReal <= System.currentTimeMillis());
+    // TODO Add verification of additional data from jobHistory - whatever was
+    // available in the failed attempt should be available here
+    
+    app.waitForAMExit();
+    validateAllContainersComplete(app.getContext());
+  }
+
+  @Test
+  public void testMultipleCrashes() throws Exception {
+
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the 1st map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+          task1Attempt1.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    // Crash the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Crash the app again.
+    app.stop();
+
+    //rerun
+    //in rerun the 1st and 2nd map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // The maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
+  @Test
+  public void testOutputRecovery() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+
+  @Test
+  public void testOutputRecoveryMapsOnly() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    // write output corresponding to map1 (This is just to validate that it is
+    //no included in the output)
+    writeBadOutput(task1Attempt1, conf);
+    
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+    //stop the app before the job completes.
+    app.stop();
+    
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask1 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+    .next();
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task2Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for map task to complete
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+
+  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+  throws Exception {
+  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+      TypeConverter.fromYarn(attempt.getID()));
+  
+  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+  RecordWriter theRecordWriter = theOutputFormat
+      .getRecordWriter(tContext);
+  
+  NullWritable nullWritable = NullWritable.get();
+  try {
+    theRecordWriter.write(key2, val2);
+    theRecordWriter.write(null, nullWritable);
+    theRecordWriter.write(null, val2);
+    theRecordWriter.write(nullWritable, val1);
+    theRecordWriter.write(key1, nullWritable);
+    theRecordWriter.write(key2, null);
+    theRecordWriter.write(null, null);
+    theRecordWriter.write(key1, val1);
+  } finally {
+    theRecordWriter.close(tContext);
+  }
+  
+  OutputFormat outputFormat = ReflectionUtils.newInstance(
+      tContext.getOutputFormatClass(), conf);
+  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+  committer.commitTask(tContext);
+}
+  
+  
+  private void writeOutput(TaskAttempt attempt, Configuration conf)
+    throws Exception {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+        TypeConverter.fromYarn(attempt.getID()));
+    
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat
+        .getRecordWriter(tContext);
+    
+    NullWritable nullWritable = NullWritable.get();
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(tContext);
+    }
+    
+    OutputFormat outputFormat = ReflectionUtils.newInstance(
+        tContext.getOutputFormatClass(), conf);
+    OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+    committer.commitTask(tContext);
+  }
+
+  private void validateOutput() throws IOException {
+    File expectedFile = new File(new Path(outputDir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = slurp(expectedFile);
+    Assert.assertEquals(output, expectedOutput.toString());
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  void validateAllContainersComplete(AppContext context) {
+    for (AMContainer container : context.getAllContainers().values()) {
+      Assert.assertEquals("Container: " + container.getContainerId()
+          + " in unexpected state: " + container.getState(),
+          AMContainerState.COMPLETED, container.getState());
+    }
+  }
+
+
+  static class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(final AppContext context) {
+      MockContainerLauncher launcher = new MockContainerLauncher() {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+          if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+            AMContainer amContainer = getContext().getAllContainers().get(
+                event.getContainerId());
+            TaskAttemptId attemptIdForContainer = amContainer
+                .getQueuedTaskAttempts().iterator().next();
+            // Pass everything except the 2nd attempt of the first task.
+            if (attemptIdForContainer.getId() != 1
+                || attemptIdForContainer.getTaskId().getId() != 0) {
+              super.handle(event);
+            } else {
+              context.getEventHandler().handle(
+                  new AMContainerEventLaunchFailed(event.getContainerId(),
+                      "Forcing failure"));
+            }
+          } else {
+            super.handle(event);
+          }
+        }
+      };
+      launcher.shufflePort = 5467;
+      return launcher;
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
+
+  public static void main(String[] arg) throws Exception {
+    TestRecovery test = new TestRecovery();
+    test.testCrashed();
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,883 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.TaskRuntimeEstimator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestRuntimeEstimators {
+
+  private static int INITIAL_NUMBER_FREE_SLOTS = 600;
+  private static int MAP_SLOT_REQUIREMENT = 3;
+  // this has to be at least as much as map slot requirement
+  private static int REDUCE_SLOT_REQUIREMENT = 4;
+  private static int MAP_TASKS = 200;
+  private static int REDUCE_TASKS = 150;
+
+  MockClock clock;
+
+  Job myJob;
+
+  AppContext myAppContext;
+
+  private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
+
+  private final AtomicInteger slotsInUse = new AtomicInteger(0);
+
+  AsyncDispatcher dispatcher;
+
+  DefaultSpeculator speculator;
+
+  TaskRuntimeEstimator estimator;
+
+  // This is a huge kluge.  The real implementations have a decent approach
+  private final AtomicInteger completedMaps = new AtomicInteger(0);
+  private final AtomicInteger completedReduces = new AtomicInteger(0);
+
+  private final AtomicInteger successfulSpeculations
+      = new AtomicInteger(0);
+  private final AtomicLong taskTimeSavedBySpeculation
+      = new AtomicLong(0L);
+  
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  private void coreTestEstimator
+      (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
+    estimator = testedEstimator;
+	clock = new MockClock();
+	dispatcher = new AsyncDispatcher();
+    myJob = null;
+    slotsInUse.set(0);
+    completedMaps.set(0);
+    completedReduces.set(0);
+    successfulSpeculations.set(0);
+    taskTimeSavedBySpeculation.set(0);
+
+    clock.advanceTime(1000);
+
+    Configuration conf = new Configuration();
+
+    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+    myJob = myAppContext.getAllJobs().values().iterator().next();
+
+    estimator.contextualize(conf, myAppContext);
+
+    speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
+
+    dispatcher.register(Speculator.EventType.class, speculator);
+
+    dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
+
+    dispatcher.init(conf);
+    dispatcher.start();
+
+
+
+    speculator.init(conf);
+    speculator.start();
+
+    // Now that the plumbing is hooked up, we do the following:
+    //  do until all tasks are finished, ...
+    //  1: If we have spare capacity, assign as many map tasks as we can, then
+    //     assign as many reduce tasks as we can.  Note that an odd reduce
+    //     task might be started while there are still map tasks, because
+    //     map tasks take 3 slots and reduce tasks 2 slots.
+    //  2: Send a speculation event for every task attempt that's running
+    //  note that new attempts might get started by the speculator
+
+    // discover undone tasks
+    int undoneMaps = MAP_TASKS;
+    int undoneReduces = REDUCE_TASKS;
+
+    // build a task sequence where all the maps precede any of the reduces
+    List<Task> allTasksSequence = new LinkedList<Task>();
+
+    allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
+    allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
+
+    while (undoneMaps + undoneReduces > 0) {
+      undoneMaps = 0; undoneReduces = 0;
+      // start all attempts which are new but for which there is enough slots
+      for (Task task : allTasksSequence) {
+        if (!task.isFinished()) {
+          if (task.getType() == TaskType.MAP) {
+            ++undoneMaps;
+          } else {
+            ++undoneReduces;
+          }
+        }
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          if (attempt.getState() == TaskAttemptState.NEW
+              && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
+                    >= taskTypeSlots(task.getType())) {
+            MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
+            SpeculatorEvent event
+                = new SpeculatorEvent(attempt.getID(), false, clock.getTime());
+            speculator.handle(event);
+            attemptImpl.startUp();
+          } else {
+            // If a task attempt is in progress we should send the news to
+            // the Speculator.
+            TaskAttemptStatus status = new TaskAttemptStatus();
+            status.id = attempt.getID();
+            status.progress = attempt.getProgress();
+            status.stateString = attempt.getState().name();
+            status.taskState = attempt.getState();
+            SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
+            speculator.handle(event);
+          }
+        }
+      }
+
+      long startTime = System.currentTimeMillis();
+
+      // drain the speculator event queue
+      while (!speculator.eventQueueEmpty()) {
+        Thread.yield();
+        if (System.currentTimeMillis() > startTime + 130000) {
+          return;
+        }
+      }
+
+      clock.advanceTime(1000L);
+
+      if (clock.getTime() % 10000L == 0L) {
+        speculator.scanForSpeculations();
+      }
+    }
+
+    Assert.assertEquals("We got the wrong number of successful speculations.",
+        expectedSpeculations, successfulSpeculations.get());
+  }
+
+  @Test
+  public void testLegacyEstimator() throws Exception {
+    TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
+    coreTestEstimator(specificEstimator, 3);
+  }
+
+  @Test
+  public void testExponentialEstimator() throws Exception {
+    TaskRuntimeEstimator specificEstimator
+        = new ExponentiallySmoothedTaskRuntimeEstimator();
+    coreTestEstimator(specificEstimator, 3);
+  }
+
+  int taskTypeSlots(TaskType type) {
+    return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
+  }
+
+  class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
+
+    @Override
+    public void handle(TaskEvent event) {
+      TaskId taskID = event.getTaskID();
+      Task task = myJob.getTask(taskID);
+
+      Assert.assertEquals
+          ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
+
+      System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
+
+      addAttempt(task);
+    }
+  }
+
+  void addAttempt(Task task) {
+    MyTaskImpl myTask = (MyTaskImpl) task;
+
+    myTask.addAttempt();
+  }
+
+  class MyTaskImpl implements Task {
+    private final TaskId taskID;
+    private final Map<TaskAttemptId, TaskAttempt> attempts
+        = new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4);
+
+    MyTaskImpl(JobId jobID, int index, TaskType type) {
+      taskID = recordFactory.newRecordInstance(TaskId.class);
+      taskID.setId(index);
+      taskID.setTaskType(type);
+      taskID.setJobId(jobID);
+    }
+
+    void addAttempt() {
+      TaskAttempt taskAttempt
+          = new MyTaskAttemptImpl(taskID, attempts.size(), clock);
+      TaskAttemptId taskAttemptID = taskAttempt.getID();
+
+      attempts.put(taskAttemptID, taskAttempt);
+
+      System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public TaskId getID() {
+      return taskID;
+    }
+
+    @Override
+    public TaskReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public float getProgress() {
+      float result = 0.0F;
+
+
+     for (TaskAttempt attempt : attempts.values()) {
+       result = Math.max(result, attempt.getProgress());
+     }
+
+     return result;
+    }
+
+    @Override
+    public TaskType getType() {
+      return taskID.getTaskType();
+    }
+
+    @Override
+    public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+      Map<TaskAttemptId, TaskAttempt> result
+          = new HashMap<TaskAttemptId, TaskAttempt>(attempts.size());
+      result.putAll(attempts);
+      return result;
+    }
+
+    @Override
+    public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+      return attempts.get(attemptID);
+    }
+
+    @Override
+    public boolean isFinished() {
+      for (TaskAttempt attempt : attempts.values()) {
+        if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    @Override
+    public boolean canCommit(TaskAttemptId taskAttemptID) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public TaskState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public boolean needsWaitAfterOutputConsumable() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public TaskAttemptId getOutputConsumableAttempt() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
+  class MyJobImpl implements Job {
+    private final JobId jobID;
+    private final Map<TaskId, Task> allTasks = new HashMap<TaskId, Task>();
+    private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+    private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
+
+    MyJobImpl(JobId jobID, int numMaps, int numReduces) {
+      this.jobID = jobID;
+      for (int i = 0; i < numMaps; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
+        mapTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+      for (int i = 0; i < numReduces; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
+        reduceTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+
+      // give every task an attempt
+      for (Task task : allTasks.values()) {
+        MyTaskImpl myTaskImpl = (MyTaskImpl) task;
+        myTaskImpl.addAttempt();
+      }
+    }
+
+    @Override
+    public JobId getID() {
+      return jobID;
+    }
+
+    @Override
+    public JobState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public JobReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public Counters getAllCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Map<TaskId, Task> getTasks() {
+      return allTasks;
+    }
+
+    @Override
+    public Map<TaskId, Task> getTasks(TaskType taskType) {
+      return taskType == TaskType.MAP ? mapTasks : reduceTasks;
+    }
+
+    @Override
+    public Task getTask(TaskId taskID) {
+      return allTasks.get(taskID);
+    }
+
+    @Override
+    public List<String> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getCompletedMaps() {
+      return completedMaps.get();
+    }
+
+    @Override
+    public int getCompletedReduces() {
+      return completedReduces.get();
+    }
+
+    @Override
+    public TaskAttemptCompletionEvent[]
+            getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getQueueName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getTotalMaps() {
+      return mapTasks.size();
+    }
+
+    @Override
+    public int getTotalReduces() {
+      return reduceTasks.size();
+    }
+
+    @Override
+    public boolean isUber() {
+      return false;
+    }
+
+    @Override
+    public boolean checkAccess(UserGroupInformation callerUGI,
+        JobACL jobOperation) {
+      return true;
+    }
+    
+    @Override
+    public String getUserName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Path getConfFile() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Map<JobACL, AccessControlList> getJobACLs() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public List<AMInfo> getAMInfos() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    @Override
+    public Configuration loadConfFile() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Configuration getConf() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+  }
+
+  /*
+   * We follow the pattern of the real XxxImpl .  We create a job and initialize
+   * it with a full suite of tasks which in turn have one attempt each in the
+   * NEW state.  Attempts transition only from NEW to RUNNING to SUCCEEDED .
+   */
+  class MyTaskAttemptImpl implements TaskAttempt {
+    private final TaskAttemptId myAttemptID;
+
+    long startMockTime = Long.MIN_VALUE;
+
+    long shuffleCompletedTime = Long.MAX_VALUE;
+
+    TaskAttemptState overridingState = TaskAttemptState.NEW;
+
+    MyTaskAttemptImpl(TaskId taskID, int index, Clock clock) {
+      myAttemptID = recordFactory.newRecordInstance(TaskAttemptId.class);
+      myAttemptID.setId(index);
+      myAttemptID.setTaskId(taskID);
+    }
+
+    void startUp() {
+      startMockTime = clock.getTime();
+      overridingState = null;
+
+      slotsInUse.addAndGet(taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+
+      System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(getID().getTaskId(), -1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public NodeId getNodeId() throws UnsupportedOperationException{
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public TaskAttemptId getID() {
+      return myAttemptID;
+    }
+
+    @Override
+    public TaskAttemptReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public List<String> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getShufflePort() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    private float getCodeRuntime() {
+      int taskIndex = myAttemptID.getTaskId().getId();
+      int attemptIndex = myAttemptID.getId();
+
+      float result = 200.0F;
+
+      switch (taskIndex % 4) {
+        case 0:
+          if (taskIndex % 40 == 0 && attemptIndex == 0) {
+            result = 600.0F;
+            break;
+          }
+
+          break;
+        case 2:
+          break;
+
+        case 1:
+          result = 150.0F;
+          break;
+
+        case 3:
+          result = 250.0F;
+          break;
+      }
+
+      return result;
+    }
+
+    private float getMapProgress() {
+      float runtime = getCodeRuntime();
+
+      return Math.min
+          ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
+    }
+
+    private float getReduceProgress() {
+      Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId());
+      float runtime = getCodeRuntime();
+
+      Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
+
+      int numberMaps = allMapTasks.size();
+      int numberDoneMaps = 0;
+
+      for (Task mapTask : allMapTasks) {
+        if (mapTask.isFinished()) {
+          ++numberDoneMaps;
+        }
+      }
+
+      if (numberMaps == numberDoneMaps) {
+        shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
+
+        return Math.min
+            ((float) (clock.getTime() - shuffleCompletedTime)
+                        / (runtime * 2000.0F) + 0.5F,
+             1.0F);
+      } else {
+        return ((float) numberDoneMaps) / numberMaps * 0.5F;
+      }
+    }
+
+    // we compute progress from time and an algorithm now
+    @Override
+    public float getProgress() {
+      if (overridingState == TaskAttemptState.NEW) {
+        return 0.0F;
+      }
+      return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
+    }
+
+    @Override
+    public TaskAttemptState getState() {
+      if (overridingState != null) {
+        return overridingState;
+      }
+      TaskAttemptState result
+          = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
+
+      if (result == TaskAttemptState.SUCCEEDED) {
+        overridingState = TaskAttemptState.SUCCEEDED;
+
+        System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
+
+        slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+
+        (myAttemptID.getTaskId().getTaskType() == TaskType.MAP
+            ? completedMaps : completedReduces).getAndIncrement();
+
+        // check for a spectacularly successful speculation
+        TaskId taskID = myAttemptID.getTaskId();
+
+        Task task = myJob.getTask(taskID);
+
+        for (TaskAttempt otherAttempt : task.getAttempts().values()) {
+          if (otherAttempt != this
+              && otherAttempt.getState() == TaskAttemptState.RUNNING) {
+            // we had two instances running.  Try to determine how much
+            //  we might have saved by speculation
+            if (getID().getId() > otherAttempt.getID().getId()) {
+              // the speculation won
+              successfulSpeculations.getAndIncrement();
+              float hisProgress = otherAttempt.getProgress();
+              long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
+              System.out.println("TLTRE:A speculation finished at time "
+                  + clock.getTime()
+                  + ".  The stalled attempt is at " + (hisProgress * 100.0)
+                  + "% progress, and it started at "
+                  + hisStartTime + ", which is "
+                  + (clock.getTime() - hisStartTime) + " ago.");
+              long originalTaskEndEstimate
+                  = (hisStartTime
+                      + estimator.estimatedRuntime(otherAttempt.getID()));
+              System.out.println(
+                  "TLTRE: We would have expected the original attempt to take "
+                  + estimator.estimatedRuntime(otherAttempt.getID())
+                  + ", finishing at " + originalTaskEndEstimate);
+              long estimatedSavings = originalTaskEndEstimate - clock.getTime();
+              taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
+              System.out.println("TLTRE: The task is " + task.getID());
+              slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+              ((MyTaskAttemptImpl)otherAttempt).overridingState
+                  = TaskAttemptState.KILLED;
+            } else {
+              System.out.println(
+                  "TLTRE: The normal attempt beat the speculation in "
+                  + task.getID());
+            }
+          }
+        }
+      }
+
+      return result;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return getProgress() == 1.0F;
+    }
+
+    @Override
+    public ContainerId getAssignedContainerID() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getNodeHttpAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    @Override
+    public String getNodeRackName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getLaunchTime() {
+      return startMockTime;
+    }
+
+    @Override
+    public long getFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getShuffleFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getSortFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getAssignedContainerMgrAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  static class MockClock implements Clock {
+    private long currentTime = 0;
+
+    public long getTime() {
+      return currentTime;
+    }
+
+    void setMeasuredTime(long newTime) {
+      currentTime = newTime;
+    }
+
+    void advanceTime(long increment) {
+      currentTime += increment;
+    }
+  }
+
+  class MyAppMaster extends CompositeService {
+    final Clock clock;
+      public MyAppMaster(Clock clock) {
+        super(MyAppMaster.class.getName());
+        if (clock == null) {
+          clock = new SystemClock();
+        }
+      this.clock = clock;
+      LOG.info("Created MyAppMaster");
+    }
+  }
+
+  class MyAppContext implements AppContext {
+    private final ApplicationAttemptId myAppAttemptID;
+    private final ApplicationId myApplicationID;
+    private final JobId myJobID;
+    private final Map<JobId, Job> allJobs;
+
+    MyAppContext(int numberMaps, int numberReduces) {
+      myApplicationID = recordFactory.newRecordInstance(ApplicationId.class);
+      myApplicationID.setClusterTimestamp(clock.getTime());
+      myApplicationID.setId(1);
+
+      myAppAttemptID = recordFactory
+          .newRecordInstance(ApplicationAttemptId.class);
+      myAppAttemptID.setApplicationId(myApplicationID);
+      myAppAttemptID.setAttemptId(0);
+      myJobID = recordFactory.newRecordInstance(JobId.class);
+      myJobID.setAppId(myApplicationID);
+
+      Job myJob
+          = new MyJobImpl(myJobID, numberMaps, numberReduces);
+
+      allJobs = Collections.singletonMap(myJobID, myJob);
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return myAppAttemptID;
+    }
+
+    @Override
+    public ApplicationId getApplicationID() {
+      return myApplicationID;
+    }
+
+    @Override
+    public Job getJob(JobId jobID) {
+      return allJobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobId, Job> getAllJobs() {
+      return allJobs;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Clock getClock() {
+      return clock;
+    }
+
+    @Override
+    public String getApplicationName() {
+      return null;
+    }
+
+    @Override
+    public long getStartTime() {
+      return 0;
+    }
+
+    @Override
+    public ClusterInfo getClusterInfo() {
+      return new ClusterInfo();
+    }
+
+    @Override
+    public AMContainerMap getAllContainers() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public AMNodeMap getAllNodes() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationAccessType, String> getApplicationACLs() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+
+/**
+ * Make sure that the job staging directory clean up happens.
+ */
+ public class TestStagingCleanup {
+   
+   private Configuration conf = new Configuration();
+   private FileSystem fs;
+   private String stagingJobDir = "tmpJobDir";
+   private Path stagingJobPath = new Path(stagingJobDir);
+   private final static RecordFactory recordFactory = RecordFactoryProvider.
+       getRecordFactory(null);
+   
+   @Test
+   public void testDeletionofStaging() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     MRAppMaster appMaster = new TestMRApp(attemptId);
+     appMaster.init(conf);
+     EventHandler<JobFinishEvent> handler = 
+         appMaster.createJobFinishEventHandler();
+     handler.handle(new JobFinishEvent(jobid));
+     verify(fs).delete(stagingJobPath, true);
+   }
+
+   private class TestMRApp extends MRAppMaster {
+
+    public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+      super(applicationAttemptId, BuilderUtils.newContainerId(
+          applicationAttemptId, 1), "testhost", 2222, 3333, System
+          .currentTimeMillis());
+    }
+     
+    @Override
+    protected FileSystem getFileSystem(Configuration conf) {
+      return fs;
+    }
+    
+    @Override
+    protected void sysexit() {      
+    }
+    
+    @Override
+    public Configuration getConfig() {
+      return conf;
+    }
+   }
+
+  private final class MRAppTestCleanup extends MRApp {
+    boolean stoppedContainerAllocator;
+    boolean cleanedBeforeContainerAllocatorStopped;
+
+    public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+      stoppedContainerAllocator = false;
+      cleanedBeforeContainerAllocatorStopped = false;
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      UserGroupInformation currentUser = null;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+      Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+          getDispatcher().getEventHandler(),
+          getTaskAttemptListener(), getContext().getClock(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getTaskHeartbeatHandler(), getContext());
+      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+      getDispatcher().register(JobFinishEvent.Type.class,
+          createJobFinishEventHandler());
+
+      return newJob;
+    }
+
+    @Override
+    protected ContainerRequestor createContainerRequestor(
+        ClientService clientService, AppContext appContext) {
+      return new TestCleanupContainerRequestor(clientService, appContext);
+    }
+
+    private class TestCleanupContainerRequestor extends MRAppContainerRequestor {
+      public TestCleanupContainerRequestor(ClientService clientService,
+          AppContext context) {
+        super(clientService, context);
+      }
+
+      @Override
+      public synchronized void stop() {
+        stoppedContainerAllocator = true;
+        super.stop();
+      }
+    }
+
+    @Override
+    public void cleanupStagingDir() throws IOException {
+      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+    }
+
+    @Override
+    protected void sysexit() {
+    }
+  }
+
+  @Test
+  public void testStagingCleanupOrder() throws Exception {
+    MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+        this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    int waitTime = 20 * 1000;
+    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+      Thread.sleep(100);
+      waitTime -= 100;
+    }
+    Assert.assertTrue("Staging directory not cleaned before notifying RM",
+        app.cleanedBeforeContainerAllocatorStopped);
+  }
+ }

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskHeartbeatHandler {
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testTimeout() throws InterruptedException {
+    EventHandler mockHandler = mock(EventHandler.class);
+    Clock clock = new SystemClock();
+    AppContext context = mock(AppContext.class);
+    when(context.getEventHandler()).thenReturn(mockHandler);
+    when(context.getClock()).thenReturn(clock);
+    
+    TaskHeartbeatHandler hb = new TaskHeartbeatHandler(context, 1);
+    
+    
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms
+    conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
+    
+    hb.init(conf);
+    hb.start();
+    try {
+      ApplicationId appId = BuilderUtils.newApplicationId(0l, 5);
+      JobId jobId = MRBuilderUtils.newJobId(appId, 4);
+      TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
+      TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
+      hb.register(taid);
+      Thread.sleep(100);
+      //Events only happen when the task is canceled
+      verify(mockHandler, times(2)).handle(any(Event.class));
+    } finally {
+      hb.stop();
+    }
+  }
+
+}



Mime
View raw message