hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [18/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,752 @@
+///**
+//* Licensed to the Apache Software Foundation (ASF) under one
+//* or more contributor license agreements.  See the NOTICE file
+//* distributed with this work for additional information
+//* regarding copyright ownership.  The ASF licenses this file
+//* to you under the Apache License, Version 2.0 (the
+//* "License"); you may not use this file except in compliance
+//* with the License.  You may obtain a copy of the License at
+//*
+//*     http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing, software
+//* distributed under the License is distributed on an "AS IS" BASIS,
+//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//* See the License for the specific language governing permissions and
+//* limitations under the License.
+//*/
+//
+//package org.apache.hadoop.mapreduce.v2.app2;
+//
+//import java.io.File;
+//import java.io.FileInputStream;
+//import java.io.IOException;
+//import java.util.Iterator;
+//
+//import junit.framework.Assert;
+//
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.io.NullWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.MRJobConfig;
+//import org.apache.hadoop.mapreduce.OutputCommitter;
+//import org.apache.hadoop.mapreduce.OutputFormat;
+//import org.apache.hadoop.mapreduce.RecordWriter;
+//import org.apache.hadoop.mapreduce.TaskAttemptContext;
+//import org.apache.hadoop.mapreduce.TypeConverter;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+//import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+//import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+//import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherEvent;
+//import org.apache.hadoop.util.ReflectionUtils;
+//import org.apache.hadoop.yarn.event.EventHandler;
+//import org.junit.Test;
+//
+//@SuppressWarnings({"unchecked", "rawtypes"})
+//public class TestRecovery {
+//
+//  private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+//  private static Path outputDir = new Path(new File("target", 
+//      TestRecovery.class.getName()).getAbsolutePath() + 
+//      Path.SEPARATOR + "out");
+//  private static String partFile = "part-r-00000";
+//  private Text key1 = new Text("key1");
+//  private Text key2 = new Text("key2");
+//  private Text val1 = new Text("val1");
+//  private Text val2 = new Text("val2");
+//
+//  /**
+//   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+//   * completely disappears because of failed launch, one attempt gets killed and
+//   * one attempt succeeds. AM crashes after the first tasks finishes and
+//   * recovers completely and succeeds in the second generation.
+//   * 
+//   * @throws Exception
+//   */
+//  @Test
+//  public void testCrashed() throws Exception {
+//
+//    int runCount = 0;
+//    long am1StartTimeEst = System.currentTimeMillis();
+//    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+//    Configuration conf = new Configuration();
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    long jobStartTime = job.getReport().getStartTime();
+//    //all maps would be running
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//    Task mapTask1 = it.next();
+//    Task mapTask2 = it.next();
+//    Task reduceTask = it.next();
+//    
+//    // all maps must be running
+//    app.waitForState(mapTask1, TaskState.RUNNING);
+//    app.waitForState(mapTask2, TaskState.RUNNING);
+//    
+//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+//    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+//    
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+//    
+//    // reduces must be in NEW state
+//    Assert.assertEquals("Reduce Task state not correct",
+//        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+//
+//    /////////// Play some games with the TaskAttempts of the first task //////
+//    //send the fail signal to the 1st map task attempt
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task1Attempt1.getID(),
+//            TaskAttemptEventType.TA_FAILMSG));
+//    
+//    app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
+//
+//    int timeOut = 0;
+//    while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
+//      Thread.sleep(2000);
+//      LOG.info("Waiting for next attempt to start");
+//    }
+//    Assert.assertEquals(2, mapTask1.getAttempts().size());
+//    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+//    itr.next();
+//    TaskAttempt task1Attempt2 = itr.next();
+//    
+//    // This attempt will automatically fail because of the way ContainerLauncher
+//    // is setup
+//    // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
+//    app.getContext().getEventHandler().handle(
+//      new TaskAttemptEvent(task1Attempt2.getID(),
+//        TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+//    app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
+//
+//    timeOut = 0;
+//    while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
+//      Thread.sleep(2000);
+//      LOG.info("Waiting for next attempt to start");
+//    }
+//    Assert.assertEquals(3, mapTask1.getAttempts().size());
+//    itr = mapTask1.getAttempts().values().iterator();
+//    itr.next();
+//    itr.next();
+//    TaskAttempt task1Attempt3 = itr.next();
+//    
+//    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+//
+//    //send the kill signal to the 1st map 3rd attempt
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task1Attempt3.getID(),
+//            TaskAttemptEventType.TA_KILL));
+//    
+//    app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
+//
+//    timeOut = 0;
+//    while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
+//      Thread.sleep(2000);
+//      LOG.info("Waiting for next attempt to start");
+//    }
+//    Assert.assertEquals(4, mapTask1.getAttempts().size());
+//    itr = mapTask1.getAttempts().values().iterator();
+//    itr.next();
+//    itr.next();
+//    itr.next();
+//    TaskAttempt task1Attempt4 = itr.next();
+//    
+//    app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
+//
+//    //send the done signal to the 1st map 4th attempt
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task1Attempt4.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//
+//    /////////// End of games with the TaskAttempts of the first task //////
+//
+//    //wait for first map task to complete
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//    long task1StartTime = mapTask1.getReport().getStartTime();
+//    long task1FinishTime = mapTask1.getReport().getFinishTime();
+//    
+//    //stop the app
+//    app.stop();
+//
+//    //rerun
+//    //in rerun the 1st map will be recovered from previous run
+//    long am2StartTimeEst = System.currentTimeMillis();
+//    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+//    conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    //all maps would be running
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    it = job.getTasks().values().iterator();
+//    mapTask1 = it.next();
+//    mapTask2 = it.next();
+//    reduceTask = it.next();
+//    
+//    // first map will be recovered, no need to send done
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//    
+//    app.waitForState(mapTask2, TaskState.RUNNING);
+//    
+//    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+//    
+//    //send the done signal to the 2nd map task
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            mapTask2.getAttempts().values().iterator().next().getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    //wait to get it completed
+//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+//    
+//    //wait for reduce to be running before sending done
+//    app.waitForState(reduceTask, TaskState.RUNNING);
+//    //send the done signal to the reduce
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            reduceTask.getAttempts().values().iterator().next().getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    app.waitForState(job, JobState.SUCCEEDED);
+//    app.verifyCompleted();
+//    Assert.assertEquals("Job Start time not correct",
+//        jobStartTime, job.getReport().getStartTime());
+//    Assert.assertEquals("Task Start time not correct",
+//        task1StartTime, mapTask1.getReport().getStartTime());
+//    Assert.assertEquals("Task Finish time not correct",
+//        task1FinishTime, mapTask1.getReport().getFinishTime());
+//    Assert.assertEquals(2, job.getAMInfos().size());
+//    int attemptNum = 1;
+//    // Verify AMInfo
+//    for (AMInfo amInfo : job.getAMInfos()) {
+//      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+//          .getAttemptId());
+//      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+//          .getApplicationAttemptId());
+//      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+//      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+//      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+//    }
+//    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+//    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+//    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+//        && am1StartTimeReal <= am2StartTimeEst);
+//    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+//        && am2StartTimeReal <= System.currentTimeMillis());
+//    // TODO Add verification of additional data from jobHistory - whatever was
+//    // available in the failed attempt should be available here
+//  }
+//
+//  @Test
+//  public void testMultipleCrashes() throws Exception {
+//
+//    int runCount = 0;
+//    MRApp app =
+//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
+//          ++runCount);
+//    Configuration conf = new Configuration();
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    //all maps would be running
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//    Task mapTask1 = it.next();
+//    Task mapTask2 = it.next();
+//    Task reduceTask = it.next();
+//    
+//    // all maps must be running
+//    app.waitForState(mapTask1, TaskState.RUNNING);
+//    app.waitForState(mapTask2, TaskState.RUNNING);
+//    
+//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+//    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+//    
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+//    
+//    // reduces must be in NEW state
+//    Assert.assertEquals("Reduce Task state not correct",
+//        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+//
+//    //send the done signal to the 1st map
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//          task1Attempt1.getID(),
+//          TaskAttemptEventType.TA_DONE));
+//
+//    //wait for first map task to complete
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//    
+//    // Crash the app
+//    app.stop();
+//
+//    //rerun
+//    //in rerun the 1st map will be recovered from previous run
+//    app =
+//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+//          ++runCount);
+//    conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    //all maps would be running
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    it = job.getTasks().values().iterator();
+//    mapTask1 = it.next();
+//    mapTask2 = it.next();
+//    reduceTask = it.next();
+//    
+//    // first map will be recovered, no need to send done
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//    
+//    app.waitForState(mapTask2, TaskState.RUNNING);
+//    
+//    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+//    
+//    //send the done signal to the 2nd map task
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            mapTask2.getAttempts().values().iterator().next().getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    //wait to get it completed
+//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+//
+//    // Crash the app again.
+//    app.stop();
+//
+//    //rerun
+//    //in rerun the 1st and 2nd map will be recovered from previous run
+//    app =
+//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+//          ++runCount);
+//    conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    //all maps would be running
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    it = job.getTasks().values().iterator();
+//    mapTask1 = it.next();
+//    mapTask2 = it.next();
+//    reduceTask = it.next();
+//    
+//    // The maps will be recovered, no need to send done
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+//
+//    //wait for reduce to be running before sending done
+//    app.waitForState(reduceTask, TaskState.RUNNING);
+//    //send the done signal to the reduce
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            reduceTask.getAttempts().values().iterator().next().getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    app.waitForState(job, JobState.SUCCEEDED);
+//    app.verifyCompleted();
+//  }
+//
+//  @Test
+//  public void testOutputRecovery() throws Exception {
+//    int runCount = 0;
+//    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+//        true, ++runCount);
+//    Configuration conf = new Configuration();
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//    Task mapTask1 = it.next();
+//    Task reduceTask1 = it.next();
+//    
+//    // all maps must be running
+//    app.waitForState(mapTask1, TaskState.RUNNING);
+//    
+//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+//        .next();
+//    
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+//  
+//    //send the done signal to the map
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task1Attempt1.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    //wait for map task to complete
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//
+//    // Verify the shuffle-port
+//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+//    
+//    app.waitForState(reduceTask1, TaskState.RUNNING);
+//    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+//    
+//    // write output corresponding to reduce1
+//    writeOutput(reduce1Attempt1, conf);
+//    
+//    //send the done signal to the 1st reduce
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            reduce1Attempt1.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//
+//    //wait for first reduce task to complete
+//    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+//    
+//    //stop the app before the job completes.
+//    app.stop();
+//
+//    //rerun
+//    //in rerun the map will be recovered from previous run
+//    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+//        ++runCount);
+//    conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    it = job.getTasks().values().iterator();
+//    mapTask1 = it.next();
+//    reduceTask1 = it.next();
+//    Task reduceTask2 = it.next();
+//    
+//    // map will be recovered, no need to send done
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//
+//    // Verify the shuffle-port after recovery
+//    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+//    
+//    // first reduce will be recovered, no need to send done
+//    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+//    
+//    app.waitForState(reduceTask2, TaskState.RUNNING);
+//    
+//    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+//        .iterator().next();
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+//    
+//   //send the done signal to the 2nd reduce task
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            reduce2Attempt.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    //wait to get it completed
+//    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+//    
+//    app.waitForState(job, JobState.SUCCEEDED);
+//    app.verifyCompleted();
+//    validateOutput();
+//  }
+//
+//  @Test
+//  public void testOutputRecoveryMapsOnly() throws Exception {
+//    int runCount = 0;
+//    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+//        true, ++runCount);
+//    Configuration conf = new Configuration();
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    Job job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    Iterator<Task> it = job.getTasks().values().iterator();
+//    Task mapTask1 = it.next();
+//    Task mapTask2 = it.next();
+//    Task reduceTask1 = it.next();
+//    
+//    // all maps must be running
+//    app.waitForState(mapTask1, TaskState.RUNNING);
+//    
+//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+//        .next();
+//    
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+//  
+//    // write output corresponding to map1 (This is just to validate that it is
+//    //no included in the output)
+//    writeBadOutput(task1Attempt1, conf);
+//    
+//    //send the done signal to the map
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task1Attempt1.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//    
+//    //wait for map task to complete
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//
+//    // Verify the shuffle-port
+//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+//
+//    //stop the app before the job completes.
+//    app.stop();
+//    
+//    //rerun
+//    //in rerun the map will be recovered from previous run
+//    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+//        ++runCount);
+//    conf = new Configuration();
+//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+//    conf.setBoolean("mapred.mapper.new-api", true);
+//    conf.setBoolean("mapred.reducer.new-api", true);
+//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+//    job = app.submit(conf);
+//    app.waitForState(job, JobState.RUNNING);
+//    Assert.assertEquals("No of tasks not correct",
+//       3, job.getTasks().size());
+//    it = job.getTasks().values().iterator();
+//    mapTask1 = it.next();
+//    mapTask2 = it.next();
+//    reduceTask1 = it.next();
+//    
+//    // map will be recovered, no need to send done
+//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+//
+//    // Verify the shuffle-port after recovery
+//    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+//    
+//    app.waitForState(mapTask2, TaskState.RUNNING);
+//    
+//    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+//    .next();
+//
+//    //before sending the TA_DONE, event make sure attempt has come to 
+//    //RUNNING state
+//    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+//
+//    //send the done signal to the map
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            task2Attempt1.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//
+//    //wait for map task to complete
+//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+//
+//    // Verify the shuffle-port
+//    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+//    
+//    app.waitForState(reduceTask1, TaskState.RUNNING);
+//    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+//    
+//    // write output corresponding to reduce1
+//    writeOutput(reduce1Attempt1, conf);
+//    
+//    //send the done signal to the 1st reduce
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEvent(
+//            reduce1Attempt1.getID(),
+//            TaskAttemptEventType.TA_DONE));
+//
+//    //wait for first reduce task to complete
+//    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+//    
+//    app.waitForState(job, JobState.SUCCEEDED);
+//    app.verifyCompleted();
+//    validateOutput();
+//  }
+//  
+//  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+//  throws Exception {
+//  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+//      TypeConverter.fromYarn(attempt.getID()));
+//  
+//  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+//  RecordWriter theRecordWriter = theOutputFormat
+//      .getRecordWriter(tContext);
+//  
+//  NullWritable nullWritable = NullWritable.get();
+//  try {
+//    theRecordWriter.write(key2, val2);
+//    theRecordWriter.write(null, nullWritable);
+//    theRecordWriter.write(null, val2);
+//    theRecordWriter.write(nullWritable, val1);
+//    theRecordWriter.write(key1, nullWritable);
+//    theRecordWriter.write(key2, null);
+//    theRecordWriter.write(null, null);
+//    theRecordWriter.write(key1, val1);
+//  } finally {
+//    theRecordWriter.close(tContext);
+//  }
+//  
+//  OutputFormat outputFormat = ReflectionUtils.newInstance(
+//      tContext.getOutputFormatClass(), conf);
+//  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+//  committer.commitTask(tContext);
+//}
+//  
+//  
+//  private void writeOutput(TaskAttempt attempt, Configuration conf)
+//    throws Exception {
+//    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+//        TypeConverter.fromYarn(attempt.getID()));
+//    
+//    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+//    RecordWriter theRecordWriter = theOutputFormat
+//        .getRecordWriter(tContext);
+//    
+//    NullWritable nullWritable = NullWritable.get();
+//    try {
+//      theRecordWriter.write(key1, val1);
+//      theRecordWriter.write(null, nullWritable);
+//      theRecordWriter.write(null, val1);
+//      theRecordWriter.write(nullWritable, val2);
+//      theRecordWriter.write(key2, nullWritable);
+//      theRecordWriter.write(key1, null);
+//      theRecordWriter.write(null, null);
+//      theRecordWriter.write(key2, val2);
+//    } finally {
+//      theRecordWriter.close(tContext);
+//    }
+//    
+//    OutputFormat outputFormat = ReflectionUtils.newInstance(
+//        tContext.getOutputFormatClass(), conf);
+//    OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+//    committer.commitTask(tContext);
+//  }
+//
+//  private void validateOutput() throws IOException {
+//    File expectedFile = new File(new Path(outputDir, partFile).toString());
+//    StringBuffer expectedOutput = new StringBuffer();
+//    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+//    expectedOutput.append(val1).append("\n");
+//    expectedOutput.append(val2).append("\n");
+//    expectedOutput.append(key2).append("\n");
+//    expectedOutput.append(key1).append("\n");
+//    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+//    String output = slurp(expectedFile);
+//    Assert.assertEquals(output, expectedOutput.toString());
+//  }
+//
+//  public static String slurp(File f) throws IOException {
+//    int len = (int) f.length();
+//    byte[] buf = new byte[len];
+//    FileInputStream in = new FileInputStream(f);
+//    String contents = null;
+//    try {
+//      in.read(buf, 0, len);
+//      contents = new String(buf, "UTF-8");
+//    } finally {
+//      in.close();
+//    }
+//    return contents;
+//  }
+//
+//
+//  static class MRAppWithHistory extends MRApp {
+//    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+//        String testName, boolean cleanOnStart, int startCount) {
+//      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+//    }
+//
+//    @Override
+//    protected ContainerLauncher createContainerLauncher(AppContext context) {
+//      MockContainerLauncher launcher = new MockContainerLauncher() {
+//        @Override
+//        public void handle(ContainerLauncherEvent event) {
+//          TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+//          // Pass everything except the 2nd attempt of the first task.
+//          if (taskAttemptID.getId() != 1
+//              || taskAttemptID.getTaskId().getId() != 0) {
+//            super.handle(event);
+//          }
+//        }
+//      };
+//      launcher.shufflePort = 5467;
+//      return launcher;
+//    }
+//
+//    @Override
+//    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+//        AppContext context) {
+//      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
+//          getStartCount());
+//      return eventHandler;
+//    }
+//  }
+//
+//  public static void main(String[] arg) throws Exception {
+//    TestRecovery test = new TestRecovery();
+//    test.testCrashed();
+//  }
+//}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRuntimeEstimators.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,881 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.TaskRuntimeEstimator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestRuntimeEstimators {
+
+  private static int INITIAL_NUMBER_FREE_SLOTS = 600;
+  private static int MAP_SLOT_REQUIREMENT = 3;
+  // this has to be at least as much as map slot requirement
+  private static int REDUCE_SLOT_REQUIREMENT = 4;
+  private static int MAP_TASKS = 200;
+  private static int REDUCE_TASKS = 150;
+
+  MockClock clock;
+
+  Job myJob;
+
+  AppContext myAppContext;
+
+  private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
+
+  private final AtomicInteger slotsInUse = new AtomicInteger(0);
+
+  AsyncDispatcher dispatcher;
+
+  DefaultSpeculator speculator;
+
+  TaskRuntimeEstimator estimator;
+
+  // This is a huge kluge.  The real implementations have a decent approach
+  private final AtomicInteger completedMaps = new AtomicInteger(0);
+  private final AtomicInteger completedReduces = new AtomicInteger(0);
+
+  private final AtomicInteger successfulSpeculations
+      = new AtomicInteger(0);
+  private final AtomicLong taskTimeSavedBySpeculation
+      = new AtomicLong(0L);
+  
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  private void coreTestEstimator
+      (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
+    estimator = testedEstimator;
+	clock = new MockClock();
+	dispatcher = new AsyncDispatcher();
+    myJob = null;
+    slotsInUse.set(0);
+    completedMaps.set(0);
+    completedReduces.set(0);
+    successfulSpeculations.set(0);
+    taskTimeSavedBySpeculation.set(0);
+
+    clock.advanceTime(1000);
+
+    Configuration conf = new Configuration();
+
+    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+    myJob = myAppContext.getAllJobs().values().iterator().next();
+
+    estimator.contextualize(conf, myAppContext);
+
+    speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
+
+    dispatcher.register(Speculator.EventType.class, speculator);
+
+    dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
+
+    dispatcher.init(conf);
+    dispatcher.start();
+
+
+
+    speculator.init(conf);
+    speculator.start();
+
+    // Now that the plumbing is hooked up, we do the following:
+    //  do until all tasks are finished, ...
+    //  1: If we have spare capacity, assign as many map tasks as we can, then
+    //     assign as many reduce tasks as we can.  Note that an odd reduce
+    //     task might be started while there are still map tasks, because
+    //     map tasks take 3 slots and reduce tasks 2 slots.
+    //  2: Send a speculation event for every task attempt that's running
+    //  note that new attempts might get started by the speculator
+
+    // discover undone tasks
+    int undoneMaps = MAP_TASKS;
+    int undoneReduces = REDUCE_TASKS;
+
+    // build a task sequence where all the maps precede any of the reduces
+    List<Task> allTasksSequence = new LinkedList<Task>();
+
+    allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
+    allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
+
+    while (undoneMaps + undoneReduces > 0) {
+      undoneMaps = 0; undoneReduces = 0;
+      // start all attempts which are new but for which there is enough slots
+      for (Task task : allTasksSequence) {
+        if (!task.isFinished()) {
+          if (task.getType() == TaskType.MAP) {
+            ++undoneMaps;
+          } else {
+            ++undoneReduces;
+          }
+        }
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          if (attempt.getState() == TaskAttemptState.NEW
+              && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
+                    >= taskTypeSlots(task.getType())) {
+            MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
+            SpeculatorEvent event
+                = new SpeculatorEvent(attempt.getID(), false, clock.getTime());
+            speculator.handle(event);
+            attemptImpl.startUp();
+          } else {
+            // If a task attempt is in progress we should send the news to
+            // the Speculator.
+            TaskAttemptStatus status = new TaskAttemptStatus();
+            status.id = attempt.getID();
+            status.progress = attempt.getProgress();
+            status.stateString = attempt.getState().name();
+            status.taskState = attempt.getState();
+            SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
+            speculator.handle(event);
+          }
+        }
+      }
+
+      long startTime = System.currentTimeMillis();
+
+      // drain the speculator event queue
+      while (!speculator.eventQueueEmpty()) {
+        Thread.yield();
+        if (System.currentTimeMillis() > startTime + 130000) {
+          return;
+        }
+      }
+
+      clock.advanceTime(1000L);
+
+      if (clock.getTime() % 10000L == 0L) {
+        speculator.scanForSpeculations();
+      }
+    }
+
+    Assert.assertEquals("We got the wrong number of successful speculations.",
+        expectedSpeculations, successfulSpeculations.get());
+  }
+
+  @Test
+  public void testLegacyEstimator() throws Exception {
+    TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
+    coreTestEstimator(specificEstimator, 3);
+  }
+
+  @Test
+  public void testExponentialEstimator() throws Exception {
+    TaskRuntimeEstimator specificEstimator
+        = new ExponentiallySmoothedTaskRuntimeEstimator();
+    coreTestEstimator(specificEstimator, 3);
+  }
+
+  int taskTypeSlots(TaskType type) {
+    return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
+  }
+
+  class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
+
+    @Override
+    public void handle(TaskEvent event) {
+      TaskId taskID = event.getTaskID();
+      Task task = myJob.getTask(taskID);
+
+      Assert.assertEquals
+          ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
+
+      System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
+
+      addAttempt(task);
+    }
+  }
+
+  void addAttempt(Task task) {
+    MyTaskImpl myTask = (MyTaskImpl) task;
+
+    myTask.addAttempt();
+  }
+
+  class MyTaskImpl implements Task {
+    private final TaskId taskID;
+    private final Map<TaskAttemptId, TaskAttempt> attempts
+        = new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4);
+
+    MyTaskImpl(JobId jobID, int index, TaskType type) {
+      taskID = recordFactory.newRecordInstance(TaskId.class);
+      taskID.setId(index);
+      taskID.setTaskType(type);
+      taskID.setJobId(jobID);
+    }
+
+    void addAttempt() {
+      TaskAttempt taskAttempt
+          = new MyTaskAttemptImpl(taskID, attempts.size(), clock);
+      TaskAttemptId taskAttemptID = taskAttempt.getID();
+
+      attempts.put(taskAttemptID, taskAttempt);
+
+      System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public TaskId getID() {
+      return taskID;
+    }
+
+    @Override
+    public TaskReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public float getProgress() {
+      float result = 0.0F;
+
+
+     for (TaskAttempt attempt : attempts.values()) {
+       result = Math.max(result, attempt.getProgress());
+     }
+
+     return result;
+    }
+
+    @Override
+    public TaskType getType() {
+      return taskID.getTaskType();
+    }
+
+    @Override
+    public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+      Map<TaskAttemptId, TaskAttempt> result
+          = new HashMap<TaskAttemptId, TaskAttempt>(attempts.size());
+      result.putAll(attempts);
+      return result;
+    }
+
+    @Override
+    public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+      return attempts.get(attemptID);
+    }
+
+    @Override
+    public boolean isFinished() {
+      for (TaskAttempt attempt : attempts.values()) {
+        if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+
+    @Override
+    public boolean canCommit(TaskAttemptId taskAttemptID) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public TaskState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+  }
+
+  class MyJobImpl implements Job {
+    private final JobId jobID;
+    private final Map<TaskId, Task> allTasks = new HashMap<TaskId, Task>();
+    private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+    private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
+
+    MyJobImpl(JobId jobID, int numMaps, int numReduces) {
+      this.jobID = jobID;
+      for (int i = 0; i < numMaps; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
+        mapTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+      for (int i = 0; i < numReduces; ++i) {
+        Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
+        reduceTasks.put(newTask.getID(), newTask);
+        allTasks.put(newTask.getID(), newTask);
+      }
+
+      // give every task an attempt
+      for (Task task : allTasks.values()) {
+        MyTaskImpl myTaskImpl = (MyTaskImpl) task;
+        myTaskImpl.addAttempt();
+      }
+    }
+
+    @Override
+    public JobId getID() {
+      return jobID;
+    }
+
+    @Override
+    public JobState getState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public JobReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public Counters getAllCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Map<TaskId, Task> getTasks() {
+      return allTasks;
+    }
+
+    @Override
+    public Map<TaskId, Task> getTasks(TaskType taskType) {
+      return taskType == TaskType.MAP ? mapTasks : reduceTasks;
+    }
+
+    @Override
+    public Task getTask(TaskId taskID) {
+      return allTasks.get(taskID);
+    }
+
+    @Override
+    public List<String> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getCompletedMaps() {
+      return completedMaps.get();
+    }
+
+    @Override
+    public int getCompletedReduces() {
+      return completedReduces.get();
+    }
+
+    @Override
+    public TaskAttemptCompletionEvent[]
+            getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getQueueName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getTotalMaps() {
+      return mapTasks.size();
+    }
+
+    @Override
+    public int getTotalReduces() {
+      return reduceTasks.size();
+    }
+
+    @Override
+    public boolean isUber() {
+      return false;
+    }
+
+    @Override
+    public boolean checkAccess(UserGroupInformation callerUGI,
+        JobACL jobOperation) {
+      return true;
+    }
+    
+    @Override
+    public String getUserName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Path getConfFile() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Map<JobACL, AccessControlList> getJobACLs() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public List<AMInfo> getAMInfos() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    @Override
+    public Configuration loadConfFile() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Configuration getConf() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+  }
+
+  /*
+   * We follow the pattern of the real XxxImpl .  We create a job and initialize
+   * it with a full suite of tasks which in turn have one attempt each in the
+   * NEW state.  Attempts transition only from NEW to RUNNING to SUCCEEDED .
+   */
+  class MyTaskAttemptImpl implements TaskAttempt {
+    private final TaskAttemptId myAttemptID;
+
+    long startMockTime = Long.MIN_VALUE;
+
+    long shuffleCompletedTime = Long.MAX_VALUE;
+
+    TaskAttemptState overridingState = TaskAttemptState.NEW;
+
+    MyTaskAttemptImpl(TaskId taskID, int index, Clock clock) {
+      myAttemptID = recordFactory.newRecordInstance(TaskAttemptId.class);
+      myAttemptID.setId(index);
+      myAttemptID.setTaskId(taskID);
+    }
+
+    void startUp() {
+      startMockTime = clock.getTime();
+      overridingState = null;
+
+      slotsInUse.addAndGet(taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+
+      System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
+
+      SpeculatorEvent event = new SpeculatorEvent(getID().getTaskId(), -1);
+      dispatcher.getEventHandler().handle(event);
+    }
+
+    @Override
+    public NodeId getNodeId() throws UnsupportedOperationException{
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public TaskAttemptId getID() {
+      return myAttemptID;
+    }
+
+    @Override
+    public TaskAttemptReport getReport() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public List<String> getDiagnostics() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Counters getCounters() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int getShufflePort() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    private float getCodeRuntime() {
+      int taskIndex = myAttemptID.getTaskId().getId();
+      int attemptIndex = myAttemptID.getId();
+
+      float result = 200.0F;
+
+      switch (taskIndex % 4) {
+        case 0:
+          if (taskIndex % 40 == 0 && attemptIndex == 0) {
+            result = 600.0F;
+            break;
+          }
+
+          break;
+        case 2:
+          break;
+
+        case 1:
+          result = 150.0F;
+          break;
+
+        case 3:
+          result = 250.0F;
+          break;
+      }
+
+      return result;
+    }
+
+    private float getMapProgress() {
+      float runtime = getCodeRuntime();
+
+      return Math.min
+          ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
+    }
+
+    private float getReduceProgress() {
+      Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId());
+      float runtime = getCodeRuntime();
+
+      Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
+
+      int numberMaps = allMapTasks.size();
+      int numberDoneMaps = 0;
+
+      for (Task mapTask : allMapTasks) {
+        if (mapTask.isFinished()) {
+          ++numberDoneMaps;
+        }
+      }
+
+      if (numberMaps == numberDoneMaps) {
+        shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
+
+        return Math.min
+            ((float) (clock.getTime() - shuffleCompletedTime)
+                        / (runtime * 2000.0F) + 0.5F,
+             1.0F);
+      } else {
+        return ((float) numberDoneMaps) / numberMaps * 0.5F;
+      }
+    }
+
+    // we compute progress from time and an algorithm now
+    @Override
+    public float getProgress() {
+      if (overridingState == TaskAttemptState.NEW) {
+        return 0.0F;
+      }
+      return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
+    }
+
+    @Override
+    public TaskAttemptState getState() {
+      if (overridingState != null) {
+        return overridingState;
+      }
+      TaskAttemptState result
+          = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
+
+      if (result == TaskAttemptState.SUCCEEDED) {
+        overridingState = TaskAttemptState.SUCCEEDED;
+
+        System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
+
+        slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+
+        (myAttemptID.getTaskId().getTaskType() == TaskType.MAP
+            ? completedMaps : completedReduces).getAndIncrement();
+
+        // check for a spectacularly successful speculation
+        TaskId taskID = myAttemptID.getTaskId();
+
+        Task task = myJob.getTask(taskID);
+
+        for (TaskAttempt otherAttempt : task.getAttempts().values()) {
+          if (otherAttempt != this
+              && otherAttempt.getState() == TaskAttemptState.RUNNING) {
+            // we had two instances running.  Try to determine how much
+            //  we might have saved by speculation
+            if (getID().getId() > otherAttempt.getID().getId()) {
+              // the speculation won
+              successfulSpeculations.getAndIncrement();
+              float hisProgress = otherAttempt.getProgress();
+              long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
+              System.out.println("TLTRE:A speculation finished at time "
+                  + clock.getTime()
+                  + ".  The stalled attempt is at " + (hisProgress * 100.0)
+                  + "% progress, and it started at "
+                  + hisStartTime + ", which is "
+                  + (clock.getTime() - hisStartTime) + " ago.");
+              long originalTaskEndEstimate
+                  = (hisStartTime
+                      + estimator.estimatedRuntime(otherAttempt.getID()));
+              System.out.println(
+                  "TLTRE: We would have expected the original attempt to take "
+                  + estimator.estimatedRuntime(otherAttempt.getID())
+                  + ", finishing at " + originalTaskEndEstimate);
+              long estimatedSavings = originalTaskEndEstimate - clock.getTime();
+              taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
+              System.out.println("TLTRE: The task is " + task.getID());
+              slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
+              ((MyTaskAttemptImpl)otherAttempt).overridingState
+                  = TaskAttemptState.KILLED;
+            } else {
+              System.out.println(
+                  "TLTRE: The normal attempt beat the speculation in "
+                  + task.getID());
+            }
+          }
+        }
+      }
+
+      return result;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return getProgress() == 1.0F;
+    }
+
+    @Override
+    public ContainerId getAssignedContainerID() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getNodeHttpAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    @Override
+    public String getNodeRackName() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getLaunchTime() {
+      return startMockTime;
+    }
+
+    @Override
+    public long getFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getShuffleFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public long getSortFinishTime() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public String getAssignedContainerMgrAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  static class MockClock implements Clock {
+    private long currentTime = 0;
+
+    public long getTime() {
+      return currentTime;
+    }
+
+    void setMeasuredTime(long newTime) {
+      currentTime = newTime;
+    }
+
+    void advanceTime(long increment) {
+      currentTime += increment;
+    }
+  }
+
+  class MyAppMaster extends CompositeService {
+    final Clock clock;
+      public MyAppMaster(Clock clock) {
+        super(MyAppMaster.class.getName());
+        if (clock == null) {
+          clock = new SystemClock();
+        }
+      this.clock = clock;
+      LOG.info("Created MyAppMaster");
+    }
+  }
+
+  class MyAppContext implements AppContext {
+    private final ApplicationAttemptId myAppAttemptID;
+    private final ApplicationId myApplicationID;
+    private final JobId myJobID;
+    private final Map<JobId, Job> allJobs;
+
+    MyAppContext(int numberMaps, int numberReduces) {
+      myApplicationID = recordFactory.newRecordInstance(ApplicationId.class);
+      myApplicationID.setClusterTimestamp(clock.getTime());
+      myApplicationID.setId(1);
+
+      myAppAttemptID = recordFactory
+          .newRecordInstance(ApplicationAttemptId.class);
+      myAppAttemptID.setApplicationId(myApplicationID);
+      myAppAttemptID.setAttemptId(0);
+      myJobID = recordFactory.newRecordInstance(JobId.class);
+      myJobID.setAppId(myApplicationID);
+
+      Job myJob
+          = new MyJobImpl(myJobID, numberMaps, numberReduces);
+
+      allJobs = Collections.singletonMap(myJobID, myJob);
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return myAppAttemptID;
+    }
+
+    @Override
+    public ApplicationId getApplicationID() {
+      return myApplicationID;
+    }
+
+    @Override
+    public Job getJob(JobId jobID) {
+      return allJobs.get(jobID);
+    }
+
+    @Override
+    public Map<JobId, Job> getAllJobs() {
+      return allJobs;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Clock getClock() {
+      return clock;
+    }
+
+    @Override
+    public String getApplicationName() {
+      return null;
+    }
+
+    @Override
+    public long getStartTime() {
+      return 0;
+    }
+
+    @Override
+    public ClusterInfo getClusterInfo() {
+      return new ClusterInfo();
+    }
+
+    @Override
+    public AMContainer getContainer(ContainerId containerId) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public AMContainerMap getAllContainers() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public AMNode getNode(NodeId nodeId) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public AMNodeMap getAllNodes() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestStagingCleanup.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+
+/**
+ * Make sure that the job staging directory clean up happens.
+ */
+ public class TestStagingCleanup {
+   
+   private Configuration conf = new Configuration();
+   private FileSystem fs;
+   private String stagingJobDir = "tmpJobDir";
+   private Path stagingJobPath = new Path(stagingJobDir);
+   private final static RecordFactory recordFactory = RecordFactoryProvider.
+       getRecordFactory(null);
+   
+   @Test
+   public void testDeletionofStaging() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     MRAppMaster appMaster = new TestMRApp(attemptId);
+     appMaster.init(conf);
+     EventHandler<JobFinishEvent> handler = 
+         appMaster.createJobFinishEventHandler();
+     handler.handle(new JobFinishEvent(jobid));
+     verify(fs).delete(stagingJobPath, true);
+   }
+
+   private class TestMRApp extends MRAppMaster {
+
+    public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+      super(applicationAttemptId, BuilderUtils.newContainerId(
+          applicationAttemptId, 1), "testhost", 2222, 3333, System
+          .currentTimeMillis());
+    }
+     
+    @Override
+    protected FileSystem getFileSystem(Configuration conf) {
+      return fs;
+    }
+    
+    @Override
+    protected void sysexit() {      
+    }
+    
+    @Override
+    public Configuration getConfig() {
+      return conf;
+    }
+   }
+
+  private final class MRAppTestCleanup extends MRApp {
+    boolean stoppedContainerAllocator;
+    boolean cleanedBeforeContainerAllocatorStopped;
+
+    public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+      stoppedContainerAllocator = false;
+      cleanedBeforeContainerAllocatorStopped = false;
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      UserGroupInformation currentUser = null;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+      Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+          getDispatcher().getEventHandler(),
+          getTaskAttemptListener(), getContext().getClock(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getTaskHeartbeatHandler(), getContext());
+      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+      getDispatcher().register(JobFinishEvent.Type.class,
+          createJobFinishEventHandler());
+
+      return newJob;
+    }
+
+    @Override
+    protected RMContainerRequestor createRMContainerRequestor(
+        ClientService clientService, AppContext appContext) {
+      return new TestCleanupContainerRequestor(clientService, appContext);
+    }
+
+    private class TestCleanupContainerRequestor extends MRAppContainerRequestor {
+      public TestCleanupContainerRequestor(ClientService clientService,
+          AppContext context) {
+        super(clientService, context);
+      }
+
+      @Override
+      public synchronized void stop() {
+        stoppedContainerAllocator = true;
+        super.stop();
+      }
+    }
+
+    @Override
+    public void cleanupStagingDir() throws IOException {
+      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+    }
+
+    @Override
+    protected void sysexit() {
+    }
+  }
+
+  @Test
+  public void testStagingCleanupOrder() throws Exception {
+    MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+        this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    int waitTime = 20 * 1000;
+    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+      Thread.sleep(100);
+      waitTime -= 100;
+    }
+    Assert.assertTrue("Staging directory not cleaned before notifying RM",
+        app.cleanedBeforeContainerAllocatorStopped);
+  }
+ }

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestTaskHeartbeatHandler.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskHeartbeatHandler {
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testTimeout() throws InterruptedException {
+    EventHandler mockHandler = mock(EventHandler.class);
+    Clock clock = new SystemClock();
+    AppContext context = mock(AppContext.class);
+    when(context.getEventHandler()).thenReturn(mockHandler);
+    when(context.getClock()).thenReturn(clock);
+    
+    TaskHeartbeatHandler hb = new TaskHeartbeatHandler(context, 1);
+    
+    
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms
+    conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
+    
+    hb.init(conf);
+    hb.start();
+    try {
+      ApplicationId appId = BuilderUtils.newApplicationId(0l, 5);
+      JobId jobId = MRBuilderUtils.newJobId(appId, 4);
+      TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
+      TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
+      hb.register(taid);
+      Thread.sleep(100);
+      //Events only happen when the task is canceled
+      verify(mockHandler, times(2)).handle(any(Event.class));
+    } finally {
+      hb.stop();
+    }
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,307 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.job.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.InitTransition;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Tests various functions of the JobImpl class
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestJobImpl {
+  
+  @Test
+  public void testJobNoTasksTransition() { 
+    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Force checkJobCompleteSuccess to return null
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    mockJob.tasks = tasks;
+
+    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    JobState state = trans.transition(mockJob, mockJobEvent);
+    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
+        JobState.ERROR, state);
+  }
+
+  @Test
+  public void testCommitJobFailsJob() {
+
+    JobImpl mockJob = mock(JobImpl.class);
+    mockJob.tasks = new HashMap<TaskId, Task>();
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
+    when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
+    when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
+
+    try {
+      doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobState.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccess() {
+    
+    JobImpl mockJob = mock(JobImpl.class);
+    mockJob.tasks = new HashMap<TaskId, Task>();
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+    
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+
+    try {
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccessFailed() {
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Make the completedTasks not equal the getTasks()
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    mockJob.tasks = tasks;
+    
+    try {
+      // Just in case the code breaks and reaches these calls
+      OutputCommitter mockCommitter = mock(OutputCommitter.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    } catch (IOException e) {
+      e.printStackTrace();    
+    }
+    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
+      "for unsuccessful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    TestJobImpl t = new TestJobImpl();
+    t.testJobNoTasksTransition();
+    t.testCheckJobCompleteSuccess();
+    t.testCheckJobCompleteSuccessFailed();
+    t.testCheckAccess();
+  }
+
+  @Test
+  public void testCheckAccess() {
+    // Create two unique users
+    String user1 = System.getProperty("user.name");
+    String user2 = user1 + "1234";
+    UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1);
+    UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2);
+
+    // Create the job
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+
+    // Setup configuration access only to user1 (owner)
+    Configuration conf1 = new Configuration();
+    conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access to the user1 (owner) and user2
+    Configuration conf2 = new Configuration();
+    conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2);
+
+    // Verify access
+    JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access with security enabled and access to all
+    Configuration conf3 = new Configuration();
+    conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*");
+
+    // Verify access
+    JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf4 = new Configuration();
+    conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf5 = new Configuration();
+    conf5.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf5.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
+        null, null, null, true, null, 0, null, null, null);
+    Assert.assertTrue(job5.checkAccess(ugi1, null));
+    Assert.assertTrue(job5.checkAccess(ugi2, null));
+  }
+  @Test
+  public void testUberDecision() throws Exception {
+
+    // with default values, no of maps is 2
+    Configuration conf = new Configuration();
+    boolean isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+
+    // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
+    // reduces is 1
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
+
+    // enable uber mode, no of maps is 2 and uber task max maps is 0
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+    conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
+    isUber = testUberDecision(conf);
+    Assert.assertFalse(isUber);
+  }
+
+  private boolean testUberDecision(Configuration conf) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null, null, null,
+        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null, null);
+    InitTransition initTransition = getInitTransition();
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    initTransition.transition(job, mockJobEvent);
+    boolean isUber = job.isUber();
+    return isUber;
+  }
+
+  private InitTransition getInitTransition() {
+    InitTransition initTransition = new InitTransition() {
+      @Override
+      protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
+            new TaskSplitMetaInfo() };
+      }
+    };
+    return initTransition;
+  }
+}



Mime
View raw message