hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1384614 [2/2] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java...
Date Fri, 14 Sep 2012 00:41:01 GMT
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Sep 14 00:41:00 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -47,6 +48,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -63,11 +65,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
@@ -77,7 +82,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
-import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventReleased;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
@@ -123,6 +128,7 @@ public class MRApp extends MRAppMaster {
   private File testWorkDir;
   private Path testAbsPath;
   private ClusterInfo clusterInfo;
+  private volatile boolean exited = false;
 
   public static String NM_HOST = "localhost";
   public static int NM_PORT = 1234;
@@ -245,26 +251,6 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
-  /**
-   * Helper method to move a task attempt into a final state.
-   */
-  public void sendFinishToTaskAttempt(TaskAttempt taskAttempt,
-      TaskAttemptState finalState) throws Exception {
-    if (finalState == TaskAttemptState.SUCCEEDED) {
-      getContext().getEventHandler().handle(
-          new TaskAttemptEvent(taskAttempt.getID(),
-              TaskAttemptEventType.TA_DONE));
-    } else if (finalState == TaskAttemptState.KILLED) {
-      getContext().getEventHandler()
-          .handle(
-              new TaskAttemptEventKillRequest(taskAttempt.getID(),
-                  "Kill requested"));
-    } else if (finalState == TaskAttemptState.FAILED) {
-      getContext().getEventHandler().handle(
-          new TaskAttemptEvent(taskAttempt.getID(),
-              TaskAttemptEventType.TA_FAIL_REQUEST));
-    }
-  }
 
   public void waitForState(TaskAttempt attempt, 
       TaskAttemptState finalState) throws Exception {
@@ -300,6 +286,16 @@ public class MRApp extends MRAppMaster {
         report.getTaskState());
   }
 
+  public void waitForAMExit() throws Exception {
+    int timeoutSecs = 0;
+    while (!exited && timeoutSecs ++ < 20) {
+      System.out.println("Waiting for AM exit");
+      Thread.sleep(500);
+    }
+    System.out.print("AM Exit State is: " + exited);
+    Assert.assertEquals("AM did not exit (timedout)", true, exited);
+  }
+  
   public void waitForState(Job job, JobState finalState) throws Exception {
     int timeoutSecs = 0;
     JobReport report = job.getReport();
@@ -396,8 +392,10 @@ public class MRApp extends MRAppMaster {
   }
   
   protected class MRAppJobFinishHandler extends JobFinishEventHandlerCR {
+    
     @Override
     protected void exit() {
+      exited = true;
     }
     
     @Override
@@ -506,7 +504,7 @@ public class MRApp extends MRAppMaster {
     public MockContainerLauncher() {
     }
 
-//    @Override
+    @Override
     public void handle(NMCommunicatorEvent event) {
       switch (event.getType()) {
       case CONTAINER_LAUNCH_REQUEST:
@@ -534,7 +532,7 @@ public class MRApp extends MRAppMaster {
       case CONTAINER_STOP_REQUEST:
         ContainerStatus cs = Records.newRecord(ContainerStatus.class);
         cs.setContainerId(event.getContainerId());
-        getContext().getEventHandler().handle(new AMContainerEventReleased(cs));
+        getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
         break;
       }
     }
@@ -572,13 +570,14 @@ public class MRApp extends MRAppMaster {
     @Override public void decContainerReq(ContainerRequest req) {}
 
     public void handle(RMCommunicatorEvent rawEvent) {
-      LOG.info("XXX: MRAppContainerRequestor handling event of type:" + rawEvent.getType() + ", event: " + rawEvent);
+      LOG.info("XXX: MRAppContainerRequestor handling event of type:" + rawEvent.getType() + ", event: " + rawEvent + ", for containerId: ");
       switch (rawEvent.getType()) {
       case CONTAINER_DEALLOCATE:
         numReleaseRequests++;
         ContainerStatus cs = Records.newRecord(ContainerStatus.class);
         cs.setContainerId(((RMCommunicatorContainerDeAllocateRequestEvent)rawEvent).getContainerId());
-        getContext().getEventHandler().handle(new AMContainerEventReleased(cs));
+        getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
+        LOG.info("XXX: Sending out C_COMPLETE for containerId: " + cs.getContainerId());
         break;
       default:
         LOG.warn("Invalid event of type: " + rawEvent.getType() + ", Event: "
@@ -615,7 +614,7 @@ public class MRApp extends MRAppMaster {
         Container container = BuilderUtils.newContainer(cId, nodeId,
             NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
         
-        getContext().getAllContainers().addNewContainer(container);
+        getContext().getAllContainers().addContainerIfNew(container);
         getContext().getAllNodes().nodeSeen(nodeId);
         
         JobID id = TypeConverter.fromYarn(applicationId);
@@ -627,11 +626,13 @@ public class MRApp extends MRAppMaster {
         
         attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
         if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
-          LOG.info("XXX: Sending launch request for container: " + lEvent);
+          LOG.info("XXX: Sending launch request for container: " + cId
+              + " for taskAttemptId: " + lEvent.getAttemptID());
           getContext().getEventHandler().handle(
               new AMContainerLaunchRequestEvent(cId, lEvent, appAcls, jobId));
         }
-        LOG.info("XXX: Assigning attempt [" + lEvent.getAttemptID() + "] to Container [" + cId + "]");
+        LOG.info("XXX: Assigning attempt [" + lEvent.getAttemptID()
+            + "] to Container [" + cId + "]");
         getContext().getEventHandler().handle(
             new AMContainerAssignTAEvent(cId, lEvent.getAttemptID(), lEvent
                 .getRemoteTask()));
@@ -640,12 +641,20 @@ public class MRApp extends MRAppMaster {
       case S_TA_STOP_REQUEST:
         // Send out a Container_stop_request.
         AMSchedulerTAStopRequestEvent stEvent = (AMSchedulerTAStopRequestEvent) rawEvent;
+        LOG.info("XXX: Handling S_TA_STOP_REQUEST for attemptId:" + stEvent.getAttemptID());
         getContext().getEventHandler().handle(
             new AMContainerEvent(attemptToContainerIdMap.get(stEvent
                 .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
 
         break;
       case S_TA_SUCCEEDED:
+        // No re-use in MRApp. Stop the container.
+        AMSchedulerTASucceededEvent suEvent = (AMSchedulerTASucceededEvent) rawEvent;
+        LOG.info("XXX: Handling S_TA_SUCCEEDED for attemptId: "
+            + suEvent.getAttemptID());
+        getContext().getEventHandler().handle(
+            new AMContainerEvent(attemptToContainerIdMap.get(suEvent
+                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
         break;
       case S_CONTAINERS_ALLOCATED:
         break;
@@ -759,6 +768,50 @@ public class MRApp extends MRAppMaster {
       return splits;
     }
   }
+  
+
+  private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState) {
+    TaskAttemptStatus tas = new TaskAttemptStatus();
+    tas.id = taskAttemptId;
+    tas.progress = 1.0f;
+    tas.phase = Phase.CLEANUP;
+    tas.stateString = finalState.name();
+    tas.taskState = finalState;
+    Counters counters = new Counters();
+    tas.counters = counters;
+    return tas;
+  }
+
+  private void sendStatusUpdate(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState) {
+    TaskAttemptStatus tas = createTaskAttemptStatus(taskAttemptId, finalState);
+    getContext().getEventHandler().handle(
+        new TaskAttemptStatusUpdateEvent(taskAttemptId, tas));
+  }
 
+  /*
+   * Helper method to move a task attempt into a final state.
+   */
+  // TODO maybe rename to something like succeedTaskAttempt
+  public void sendFinishToTaskAttempt(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState, boolean sendStatusUpdate) throws Exception {
+    if (sendStatusUpdate) {
+      sendStatusUpdate(taskAttemptId, finalState);
+    }
+    if (finalState == TaskAttemptState.SUCCEEDED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptId,
+              TaskAttemptEventType.TA_DONE));
+    } else if (finalState == TaskAttemptState.KILLED) {
+      getContext().getEventHandler()
+          .handle(new TaskAttemptEventKillRequest(taskAttemptId,
+                  "Kill requested"));
+    } else if (finalState == TaskAttemptState.FAILED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptId,
+              TaskAttemptEventType.TA_FAIL_REQUEST));
+    }
+  }
 }
  

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java Fri Sep 14 00:41:00 2012
@@ -1,752 +1,778 @@
-///**
-//* Licensed to the Apache Software Foundation (ASF) under one
-//* or more contributor license agreements.  See the NOTICE file
-//* distributed with this work for additional information
-//* regarding copyright ownership.  The ASF licenses this file
-//* to you under the Apache License, Version 2.0 (the
-//* "License"); you may not use this file except in compliance
-//* with the License.  You may obtain a copy of the License at
-//*
-//*     http://www.apache.org/licenses/LICENSE-2.0
-//*
-//* Unless required by applicable law or agreed to in writing, software
-//* distributed under the License is distributed on an "AS IS" BASIS,
-//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//* See the License for the specific language governing permissions and
-//* limitations under the License.
-//*/
-//
-//package org.apache.hadoop.mapreduce.v2.app2;
-//
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.IOException;
-//import java.util.Iterator;
-//
-//import junit.framework.Assert;
-//
-//import org.apache.commons.logging.Log;
-//import org.apache.commons.logging.LogFactory;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.NullWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.MRJobConfig;
-//import org.apache.hadoop.mapreduce.OutputCommitter;
-//import org.apache.hadoop.mapreduce.OutputFormat;
-//import org.apache.hadoop.mapreduce.RecordWriter;
-//import org.apache.hadoop.mapreduce.TaskAttemptContext;
-//import org.apache.hadoop.mapreduce.TypeConverter;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
-//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-//import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-//import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
-//import org.apache.hadoop.mapreduce.v2.app2.job.Task;
-//import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
-//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherEvent;
-//import org.apache.hadoop.util.ReflectionUtils;
-//import org.apache.hadoop.yarn.event.EventHandler;
-//import org.junit.Test;
-//
-//@SuppressWarnings({"unchecked", "rawtypes"})
-//public class TestRecovery {
-//
-//  private static final Log LOG = LogFactory.getLog(TestRecovery.class);
-//  private static Path outputDir = new Path(new File("target", 
-//      TestRecovery.class.getName()).getAbsolutePath() + 
-//      Path.SEPARATOR + "out");
-//  private static String partFile = "part-r-00000";
-//  private Text key1 = new Text("key1");
-//  private Text key2 = new Text("key2");
-//  private Text val1 = new Text("val1");
-//  private Text val2 = new Text("val2");
-//
-//  /**
-//   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
-//   * completely disappears because of failed launch, one attempt gets killed and
-//   * one attempt succeeds. AM crashes after the first tasks finishes and
-//   * recovers completely and succeeds in the second generation.
-//   * 
-//   * @throws Exception
-//   */
-//  @Test
-//  public void testCrashed() throws Exception {
-//
-//    int runCount = 0;
-//    long am1StartTimeEst = System.currentTimeMillis();
-//    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
-//    Configuration conf = new Configuration();
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    Job job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    long jobStartTime = job.getReport().getStartTime();
-//    //all maps would be running
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    Iterator<Task> it = job.getTasks().values().iterator();
-//    Task mapTask1 = it.next();
-//    Task mapTask2 = it.next();
-//    Task reduceTask = it.next();
-//    
-//    // all maps must be running
-//    app.waitForState(mapTask1, TaskState.RUNNING);
-//    app.waitForState(mapTask2, TaskState.RUNNING);
-//    
-//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
-//    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
-//    
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
-//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
-//    
-//    // reduces must be in NEW state
-//    Assert.assertEquals("Reduce Task state not correct",
-//        TaskState.RUNNING, reduceTask.getReport().getTaskState());
-//
-//    /////////// Play some games with the TaskAttempts of the first task //////
-//    //send the fail signal to the 1st map task attempt
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task1Attempt1.getID(),
-//            TaskAttemptEventType.TA_FAILMSG));
-//    
-//    app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
-//
-//    int timeOut = 0;
-//    while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
-//      Thread.sleep(2000);
-//      LOG.info("Waiting for next attempt to start");
-//    }
-//    Assert.assertEquals(2, mapTask1.getAttempts().size());
-//    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
-//    itr.next();
-//    TaskAttempt task1Attempt2 = itr.next();
-//    
-//    // This attempt will automatically fail because of the way ContainerLauncher
-//    // is setup
-//    // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
-//    app.getContext().getEventHandler().handle(
-//      new TaskAttemptEvent(task1Attempt2.getID(),
-//        TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
-//    app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
-//
-//    timeOut = 0;
-//    while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
-//      Thread.sleep(2000);
-//      LOG.info("Waiting for next attempt to start");
-//    }
-//    Assert.assertEquals(3, mapTask1.getAttempts().size());
-//    itr = mapTask1.getAttempts().values().iterator();
-//    itr.next();
-//    itr.next();
-//    TaskAttempt task1Attempt3 = itr.next();
-//    
-//    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
-//
-//    //send the kill signal to the 1st map 3rd attempt
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task1Attempt3.getID(),
-//            TaskAttemptEventType.TA_KILL));
-//    
-//    app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
-//
-//    timeOut = 0;
-//    while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
-//      Thread.sleep(2000);
-//      LOG.info("Waiting for next attempt to start");
-//    }
-//    Assert.assertEquals(4, mapTask1.getAttempts().size());
-//    itr = mapTask1.getAttempts().values().iterator();
-//    itr.next();
-//    itr.next();
-//    itr.next();
-//    TaskAttempt task1Attempt4 = itr.next();
-//    
-//    app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
-//
-//    //send the done signal to the 1st map 4th attempt
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task1Attempt4.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//
-//    /////////// End of games with the TaskAttempts of the first task //////
-//
-//    //wait for first map task to complete
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//    long task1StartTime = mapTask1.getReport().getStartTime();
-//    long task1FinishTime = mapTask1.getReport().getFinishTime();
-//    
-//    //stop the app
-//    app.stop();
-//
-//    //rerun
-//    //in rerun the 1st map will be recovered from previous run
-//    long am2StartTimeEst = System.currentTimeMillis();
-//    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
-//    conf = new Configuration();
-//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    //all maps would be running
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    it = job.getTasks().values().iterator();
-//    mapTask1 = it.next();
-//    mapTask2 = it.next();
-//    reduceTask = it.next();
-//    
-//    // first map will be recovered, no need to send done
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//    
-//    app.waitForState(mapTask2, TaskState.RUNNING);
-//    
-//    task2Attempt = mapTask2.getAttempts().values().iterator().next();
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
-//    
-//    //send the done signal to the 2nd map task
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            mapTask2.getAttempts().values().iterator().next().getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    //wait to get it completed
-//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
-//    
-//    //wait for reduce to be running before sending done
-//    app.waitForState(reduceTask, TaskState.RUNNING);
-//    //send the done signal to the reduce
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            reduceTask.getAttempts().values().iterator().next().getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    app.waitForState(job, JobState.SUCCEEDED);
-//    app.verifyCompleted();
-//    Assert.assertEquals("Job Start time not correct",
-//        jobStartTime, job.getReport().getStartTime());
-//    Assert.assertEquals("Task Start time not correct",
-//        task1StartTime, mapTask1.getReport().getStartTime());
-//    Assert.assertEquals("Task Finish time not correct",
-//        task1FinishTime, mapTask1.getReport().getFinishTime());
-//    Assert.assertEquals(2, job.getAMInfos().size());
-//    int attemptNum = 1;
-//    // Verify AMInfo
-//    for (AMInfo amInfo : job.getAMInfos()) {
-//      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
-//          .getAttemptId());
-//      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
-//          .getApplicationAttemptId());
-//      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
-//      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
-//      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
-//    }
-//    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
-//    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
-//    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
-//        && am1StartTimeReal <= am2StartTimeEst);
-//    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
-//        && am2StartTimeReal <= System.currentTimeMillis());
-//    // TODO Add verification of additional data from jobHistory - whatever was
-//    // available in the failed attempt should be available here
-//  }
-//
-//  @Test
-//  public void testMultipleCrashes() throws Exception {
-//
-//    int runCount = 0;
-//    MRApp app =
-//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
-//          ++runCount);
-//    Configuration conf = new Configuration();
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    Job job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    //all maps would be running
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    Iterator<Task> it = job.getTasks().values().iterator();
-//    Task mapTask1 = it.next();
-//    Task mapTask2 = it.next();
-//    Task reduceTask = it.next();
-//    
-//    // all maps must be running
-//    app.waitForState(mapTask1, TaskState.RUNNING);
-//    app.waitForState(mapTask2, TaskState.RUNNING);
-//    
-//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
-//    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
-//    
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
-//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
-//    
-//    // reduces must be in NEW state
-//    Assert.assertEquals("Reduce Task state not correct",
-//        TaskState.RUNNING, reduceTask.getReport().getTaskState());
-//
-//    //send the done signal to the 1st map
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//          task1Attempt1.getID(),
-//          TaskAttemptEventType.TA_DONE));
-//
-//    //wait for first map task to complete
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//    
-//    // Crash the app
-//    app.stop();
-//
-//    //rerun
-//    //in rerun the 1st map will be recovered from previous run
-//    app =
-//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
-//          ++runCount);
-//    conf = new Configuration();
-//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    //all maps would be running
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    it = job.getTasks().values().iterator();
-//    mapTask1 = it.next();
-//    mapTask2 = it.next();
-//    reduceTask = it.next();
-//    
-//    // first map will be recovered, no need to send done
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//    
-//    app.waitForState(mapTask2, TaskState.RUNNING);
-//    
-//    task2Attempt = mapTask2.getAttempts().values().iterator().next();
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
-//    
-//    //send the done signal to the 2nd map task
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            mapTask2.getAttempts().values().iterator().next().getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    //wait to get it completed
-//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
-//
-//    // Crash the app again.
-//    app.stop();
-//
-//    //rerun
-//    //in rerun the 1st and 2nd map will be recovered from previous run
-//    app =
-//        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
-//          ++runCount);
-//    conf = new Configuration();
-//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    //all maps would be running
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    it = job.getTasks().values().iterator();
-//    mapTask1 = it.next();
-//    mapTask2 = it.next();
-//    reduceTask = it.next();
-//    
-//    // The maps will be recovered, no need to send done
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
-//
-//    //wait for reduce to be running before sending done
-//    app.waitForState(reduceTask, TaskState.RUNNING);
-//    //send the done signal to the reduce
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            reduceTask.getAttempts().values().iterator().next().getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    app.waitForState(job, JobState.SUCCEEDED);
-//    app.verifyCompleted();
-//  }
-//
-//  @Test
-//  public void testOutputRecovery() throws Exception {
-//    int runCount = 0;
-//    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
-//        true, ++runCount);
-//    Configuration conf = new Configuration();
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    Job job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    Iterator<Task> it = job.getTasks().values().iterator();
-//    Task mapTask1 = it.next();
-//    Task reduceTask1 = it.next();
-//    
-//    // all maps must be running
-//    app.waitForState(mapTask1, TaskState.RUNNING);
-//    
-//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
-//        .next();
-//    
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
-//  
-//    //send the done signal to the map
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task1Attempt1.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    //wait for map task to complete
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//
-//    // Verify the shuffle-port
-//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
-//    
-//    app.waitForState(reduceTask1, TaskState.RUNNING);
-//    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
-//    
-//    // write output corresponding to reduce1
-//    writeOutput(reduce1Attempt1, conf);
-//    
-//    //send the done signal to the 1st reduce
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            reduce1Attempt1.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//
-//    //wait for first reduce task to complete
-//    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
-//    
-//    //stop the app before the job completes.
-//    app.stop();
-//
-//    //rerun
-//    //in rerun the map will be recovered from previous run
-//    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
-//        ++runCount);
-//    conf = new Configuration();
-//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    it = job.getTasks().values().iterator();
-//    mapTask1 = it.next();
-//    reduceTask1 = it.next();
-//    Task reduceTask2 = it.next();
-//    
-//    // map will be recovered, no need to send done
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//
-//    // Verify the shuffle-port after recovery
-//    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
-//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
-//    
-//    // first reduce will be recovered, no need to send done
-//    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
-//    
-//    app.waitForState(reduceTask2, TaskState.RUNNING);
-//    
-//    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
-//        .iterator().next();
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
-//    
-//   //send the done signal to the 2nd reduce task
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            reduce2Attempt.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    //wait to get it completed
-//    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
-//    
-//    app.waitForState(job, JobState.SUCCEEDED);
-//    app.verifyCompleted();
-//    validateOutput();
-//  }
-//
-//  @Test
-//  public void testOutputRecoveryMapsOnly() throws Exception {
-//    int runCount = 0;
-//    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
-//        true, ++runCount);
-//    Configuration conf = new Configuration();
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    Job job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    Iterator<Task> it = job.getTasks().values().iterator();
-//    Task mapTask1 = it.next();
-//    Task mapTask2 = it.next();
-//    Task reduceTask1 = it.next();
-//    
-//    // all maps must be running
-//    app.waitForState(mapTask1, TaskState.RUNNING);
-//    
-//    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
-//        .next();
-//    
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
-//  
-//    // write output corresponding to map1 (This is just to validate that it is
-//    //no included in the output)
-//    writeBadOutput(task1Attempt1, conf);
-//    
-//    //send the done signal to the map
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task1Attempt1.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//    
-//    //wait for map task to complete
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//
-//    // Verify the shuffle-port
-//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
-//
-//    //stop the app before the job completes.
-//    app.stop();
-//    
-//    //rerun
-//    //in rerun the map will be recovered from previous run
-//    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
-//        ++runCount);
-//    conf = new Configuration();
-//    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-//    conf.setBoolean("mapred.mapper.new-api", true);
-//    conf.setBoolean("mapred.reducer.new-api", true);
-//    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-//    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-//    job = app.submit(conf);
-//    app.waitForState(job, JobState.RUNNING);
-//    Assert.assertEquals("No of tasks not correct",
-//       3, job.getTasks().size());
-//    it = job.getTasks().values().iterator();
-//    mapTask1 = it.next();
-//    mapTask2 = it.next();
-//    reduceTask1 = it.next();
-//    
-//    // map will be recovered, no need to send done
-//    app.waitForState(mapTask1, TaskState.SUCCEEDED);
-//
-//    // Verify the shuffle-port after recovery
-//    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
-//    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
-//    
-//    app.waitForState(mapTask2, TaskState.RUNNING);
-//    
-//    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
-//    .next();
-//
-//    //before sending the TA_DONE, event make sure attempt has come to 
-//    //RUNNING state
-//    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
-//
-//    //send the done signal to the map
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            task2Attempt1.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//
-//    //wait for map task to complete
-//    app.waitForState(mapTask2, TaskState.SUCCEEDED);
-//
-//    // Verify the shuffle-port
-//    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
-//    
-//    app.waitForState(reduceTask1, TaskState.RUNNING);
-//    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
-//    
-//    // write output corresponding to reduce1
-//    writeOutput(reduce1Attempt1, conf);
-//    
-//    //send the done signal to the 1st reduce
-//    app.getContext().getEventHandler().handle(
-//        new TaskAttemptEvent(
-//            reduce1Attempt1.getID(),
-//            TaskAttemptEventType.TA_DONE));
-//
-//    //wait for first reduce task to complete
-//    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
-//    
-//    app.waitForState(job, JobState.SUCCEEDED);
-//    app.verifyCompleted();
-//    validateOutput();
-//  }
-//  
-//  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
-//  throws Exception {
-//  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
-//      TypeConverter.fromYarn(attempt.getID()));
-//  
-//  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
-//  RecordWriter theRecordWriter = theOutputFormat
-//      .getRecordWriter(tContext);
-//  
-//  NullWritable nullWritable = NullWritable.get();
-//  try {
-//    theRecordWriter.write(key2, val2);
-//    theRecordWriter.write(null, nullWritable);
-//    theRecordWriter.write(null, val2);
-//    theRecordWriter.write(nullWritable, val1);
-//    theRecordWriter.write(key1, nullWritable);
-//    theRecordWriter.write(key2, null);
-//    theRecordWriter.write(null, null);
-//    theRecordWriter.write(key1, val1);
-//  } finally {
-//    theRecordWriter.close(tContext);
-//  }
-//  
-//  OutputFormat outputFormat = ReflectionUtils.newInstance(
-//      tContext.getOutputFormatClass(), conf);
-//  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
-//  committer.commitTask(tContext);
-//}
-//  
-//  
-//  private void writeOutput(TaskAttempt attempt, Configuration conf)
-//    throws Exception {
-//    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
-//        TypeConverter.fromYarn(attempt.getID()));
-//    
-//    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
-//    RecordWriter theRecordWriter = theOutputFormat
-//        .getRecordWriter(tContext);
-//    
-//    NullWritable nullWritable = NullWritable.get();
-//    try {
-//      theRecordWriter.write(key1, val1);
-//      theRecordWriter.write(null, nullWritable);
-//      theRecordWriter.write(null, val1);
-//      theRecordWriter.write(nullWritable, val2);
-//      theRecordWriter.write(key2, nullWritable);
-//      theRecordWriter.write(key1, null);
-//      theRecordWriter.write(null, null);
-//      theRecordWriter.write(key2, val2);
-//    } finally {
-//      theRecordWriter.close(tContext);
-//    }
-//    
-//    OutputFormat outputFormat = ReflectionUtils.newInstance(
-//        tContext.getOutputFormatClass(), conf);
-//    OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
-//    committer.commitTask(tContext);
-//  }
-//
-//  private void validateOutput() throws IOException {
-//    File expectedFile = new File(new Path(outputDir, partFile).toString());
-//    StringBuffer expectedOutput = new StringBuffer();
-//    expectedOutput.append(key1).append('\t').append(val1).append("\n");
-//    expectedOutput.append(val1).append("\n");
-//    expectedOutput.append(val2).append("\n");
-//    expectedOutput.append(key2).append("\n");
-//    expectedOutput.append(key1).append("\n");
-//    expectedOutput.append(key2).append('\t').append(val2).append("\n");
-//    String output = slurp(expectedFile);
-//    Assert.assertEquals(output, expectedOutput.toString());
-//  }
-//
-//  public static String slurp(File f) throws IOException {
-//    int len = (int) f.length();
-//    byte[] buf = new byte[len];
-//    FileInputStream in = new FileInputStream(f);
-//    String contents = null;
-//    try {
-//      in.read(buf, 0, len);
-//      contents = new String(buf, "UTF-8");
-//    } finally {
-//      in.close();
-//    }
-//    return contents;
-//  }
-//
-//
-//  static class MRAppWithHistory extends MRApp {
-//    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
-//        String testName, boolean cleanOnStart, int startCount) {
-//      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
-//    }
-//
-//    @Override
-//    protected ContainerLauncher createContainerLauncher(AppContext context) {
-//      MockContainerLauncher launcher = new MockContainerLauncher() {
-//        @Override
-//        public void handle(ContainerLauncherEvent event) {
-//          TaskAttemptId taskAttemptID = event.getTaskAttemptID();
-//          // Pass everything except the 2nd attempt of the first task.
-//          if (taskAttemptID.getId() != 1
-//              || taskAttemptID.getTaskId().getId() != 0) {
-//            super.handle(event);
-//          }
-//        }
-//      };
-//      launcher.shufflePort = 5467;
-//      return launcher;
-//    }
-//
-//    @Override
-//    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
-//        AppContext context) {
-//      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
-//          getStartCount());
-//      return eventHandler;
-//    }
-//  }
-//
-//  public static void main(String[] arg) throws Exception {
-//    TestRecovery test = new TestRecovery();
-//    test.testCrashed();
-//  }
-//}
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchFailed;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TestRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+  private static Path outputDir = new Path(new File("target", 
+      TestRecovery.class.getName()).getAbsolutePath() + 
+      Path.SEPARATOR + "out");
+  private static String partFile = "part-r-00000";
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
+  /**
+   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+   * completely disappears because of failed launch, one attempt gets killed and
+   * one attempt succeeds. AM crashes after the first tasks finishes and
+   * recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashed() throws Exception {
+
+    int runCount = 0;
+    long am1StartTimeEst = System.currentTimeMillis();
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    long jobStartTime = job.getReport().getStartTime();
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    /////////// Play some games with the TaskAttempts of the first task //////
+    //send the fail signal to the 1st map task attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_FAIL_REQUEST));
+    
+    app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
+
+    int timeOut = 0;
+    while (mapTask1.getAttempts().size() < 2 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    // Additional attempts run in a separate thread. Possible for the 3rd
+    // atetmpt to be crated before this check.
+    Assert.assertTrue(mapTask1.getAttempts().size() >=2);
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    TaskAttempt task1Attempt2 = itr.next();
+    
+    // This attempt will automatically fail because of the way ContainerLauncher
+    // is setup
+    // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
+
+//    app.getContext().getEventHandler().handle(
+//        new TaskAttemptEventTerminated(task1Attempt2.getID()));
+    app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
+
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Assert.assertEquals(3, mapTask1.getAttempts().size());
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt3 = itr.next();
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+    //send the kill signal to the 1st map 3rd attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt3.getID(),
+            TaskAttemptEventType.TA_KILL_REQUEST));
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
+
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Assert.assertEquals(4, mapTask1.getAttempts().size());
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt4 = itr.next();
+    
+    app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 1st map 4th attempt
+    app.sendFinishToTaskAttempt(task1Attempt4.getID(),
+        TaskAttemptState.SUCCEEDED, true);
+
+    /////////// End of games with the TaskAttempts of the first task //////
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    long task1StartTime = mapTask1.getReport().getStartTime();
+    long task1FinishTime = mapTask1.getReport().getFinishTime();
+    
+    //stop the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    long am2StartTimeEst = System.currentTimeMillis();
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+    
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals("Job Start time not correct",
+        jobStartTime, job.getReport().getStartTime());
+    Assert.assertEquals("Task Start time not correct",
+        task1StartTime, mapTask1.getReport().getStartTime());
+    Assert.assertEquals("Task Finish time not correct",
+        task1FinishTime, mapTask1.getReport().getFinishTime());
+    Assert.assertEquals(2, job.getAMInfos().size());
+    int attemptNum = 1;
+    // Verify AMInfo
+    for (AMInfo amInfo : job.getAMInfos()) {
+      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+          .getAttemptId());
+      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+          .getApplicationAttemptId());
+      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    }
+    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+        && am1StartTimeReal <= am2StartTimeEst);
+    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+        && am2StartTimeReal <= System.currentTimeMillis());
+    // TODO Add verification of additional data from jobHistory - whatever was
+    // available in the failed attempt should be available here
+    
+    app.waitForAMExit();
+    validateAllContainersComplete(app.getContext());
+  }
+
+  @Test
+  public void testMultipleCrashes() throws Exception {
+
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the 1st map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+          task1Attempt1.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    // Crash the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Crash the app again.
+    app.stop();
+
+    //rerun
+    //in rerun the 1st and 2nd map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // The maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
+  @Test
+  public void testOutputRecovery() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+
+  @Test
+  public void testOutputRecoveryMapsOnly() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    // write output corresponding to map1 (This is just to validate that it is
+    //no included in the output)
+    writeBadOutput(task1Attempt1, conf);
+    
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+    //stop the app before the job completes.
+    app.stop();
+    
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask1 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+    .next();
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task2Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for map task to complete
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+
+  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+  throws Exception {
+  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+      TypeConverter.fromYarn(attempt.getID()));
+  
+  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+  RecordWriter theRecordWriter = theOutputFormat
+      .getRecordWriter(tContext);
+  
+  NullWritable nullWritable = NullWritable.get();
+  try {
+    theRecordWriter.write(key2, val2);
+    theRecordWriter.write(null, nullWritable);
+    theRecordWriter.write(null, val2);
+    theRecordWriter.write(nullWritable, val1);
+    theRecordWriter.write(key1, nullWritable);
+    theRecordWriter.write(key2, null);
+    theRecordWriter.write(null, null);
+    theRecordWriter.write(key1, val1);
+  } finally {
+    theRecordWriter.close(tContext);
+  }
+  
+  OutputFormat outputFormat = ReflectionUtils.newInstance(
+      tContext.getOutputFormatClass(), conf);
+  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+  committer.commitTask(tContext);
+}
+  
+  
+  private void writeOutput(TaskAttempt attempt, Configuration conf)
+    throws Exception {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+        TypeConverter.fromYarn(attempt.getID()));
+    
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat
+        .getRecordWriter(tContext);
+    
+    NullWritable nullWritable = NullWritable.get();
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(tContext);
+    }
+    
+    OutputFormat outputFormat = ReflectionUtils.newInstance(
+        tContext.getOutputFormatClass(), conf);
+    OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+    committer.commitTask(tContext);
+  }
+
+  private void validateOutput() throws IOException {
+    File expectedFile = new File(new Path(outputDir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = slurp(expectedFile);
+    Assert.assertEquals(output, expectedOutput.toString());
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  void validateAllContainersComplete(AppContext context) {
+    for (AMContainer container : context.getAllContainers().values()) {
+      Assert.assertEquals("Container: " + container.getContainerId()
+          + " in unexpected state: " + container.getState(),
+          AMContainerState.COMPLETED, container.getState());
+    }
+  }
+
+
+  static class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(final AppContext context) {
+      MockContainerLauncher launcher = new MockContainerLauncher() {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+          if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+            AMContainer amContainer = getContext().getAllContainers().get(
+                event.getContainerId());
+            TaskAttemptId attemptIdForContainer = amContainer
+                .getQueuedTaskAttempts().iterator().next();
+            // Pass everything except the 2nd attempt of the first task.
+            if (attemptIdForContainer.getId() != 1
+                || attemptIdForContainer.getTaskId().getId() != 0) {
+              super.handle(event);
+            } else {
+              context.getEventHandler().handle(
+                  new AMContainerEventLaunchFailed(event.getContainerId(),
+                      "Forcing failure"));
+            }
+          } else {
+            super.handle(event);
+          }
+        }
+      };
+      launcher.shufflePort = 5467;
+      return launcher;
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
+
+  public static void main(String[] arg) throws Exception {
+    TestRecovery test = new TestRecovery();
+    test.testCrashed();
+  }
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java Fri Sep 14 00:41:00 2012
@@ -431,7 +431,8 @@ public class TestJobImpl {
 
   private void finishTask(MRApp mrApp, Task task) throws Exception {
     TaskAttempt attempt = task.getAttempts().values().iterator().next();
-    mrApp.sendFinishToTaskAttempt(attempt, TaskAttemptState.SUCCEEDED);
+    mrApp.sendFinishToTaskAttempt(attempt.getID(), TaskAttemptState.SUCCEEDED,
+        false);
   }
   private boolean testUberDecision(Configuration conf) {
     JobID jobID = JobID.forName("job_1234567890000_0001");

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java Fri Sep 14 00:41:00 2012
@@ -761,7 +761,7 @@ public class TestRMContainerAllocator {
     Resource resource = BuilderUtils.newResource(memory);
     Container container = BuilderUtils.newContainer(containerId, nodeId, host
         + ":8000", resource, priority, null);
-    appContext.getAllContainers().addNewContainer(container);
+    appContext.getAllContainers().addContainerIfNew(container);
     return container;
   }
   



Mime
View raw message