hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1384614 [1/2] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java...
Date Fri, 14 Sep 2012 00:41:01 GMT
Author: sseth
Date: Fri Sep 14 00:41:00 2012
New Revision: 1384614

URL: http://svn.apache.org/viewvc?rev=1384614&view=rev
Log:
MAPREDUCE-4617. Re-wire AM Recovery (sseth)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java
Removed:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContaienrCompleted.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventReleased.java
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestRecovery.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TestJobImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerAllocator.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 14 00:41:00 2012
@@ -16,3 +16,5 @@ Branch MR-3902
   MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted. (sseth)
 
   MAPREDUCE-4626. Fix and re-enable RMContainerAllocator unit tests (sseth)
+
+  MAPREDUCE-4617. Re-wire AM Recovery (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java Fri Sep 14 00:41:00 2012
@@ -420,6 +420,7 @@ public class TaskAttemptListenerImpl2 ex
 
     // A rough imitation of code from TaskTracker.
 
+    // TODO XXX: Does ContainerHeartbeatHandler need to be pinged on getTask() ? 
     JVMId jvmId = jvmContext.jvmId;
     LOG.info("ZZZ: JVM with ID : " + jvmId + " asked for a task");
     

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/MRAppMaster.java Fri Sep 14 00:41:00 2012
@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app2.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app2.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app2.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
@@ -81,7 +82,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
-import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNode;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
 import org.apache.hadoop.mapreduce.v2.app2.speculate.DefaultSpeculator;
@@ -106,7 +106,6 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -585,10 +584,7 @@ public class MRAppMaster extends Composi
    * @return an instance of the recovery service.
    */
   protected Recovery createRecoveryService(AppContext appContext) {
-//    return new RecoveryService(appContext.getApplicationAttemptId(),
-//        appContext.getClock(), getCommitter());
-    // TODO XXX Uncomment after fixing RecoveryService
-    return null;
+    return new RecoveryService(appContext, getCommitter());
   }
   
   /**

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java?rev=1384614&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/event/TaskAttemptEventFailRequest.java Fri Sep 14 00:41:00 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptEventFailRequest extends TaskAttemptEvent {
+
+  private final String message;
+
+  public TaskAttemptEventFailRequest(TaskAttemptId id, String message) {
+    super(id, TaskAttemptEventType.TA_FAIL_REQUEST);
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return this.message;
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskAttemptImpl.java Fri Sep 14 00:41:00 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.job.impl;
 
 import java.net.InetSocketAddress;
@@ -31,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -230,6 +249,7 @@ public abstract class TaskAttemptImpl im
     this.resourceCapability = BuilderUtils.newResource(getMemoryRequired(conf,
         taskId.getTaskType()));
     this.reportedStatus = new TaskAttemptStatus();
+    initTaskAttemptStatus(reportedStatus);
     RackResolver.init(conf);
     synchronized(stateMachineFactory) {
       if (!stateMachineInited) {
@@ -744,6 +764,9 @@ public abstract class TaskAttemptImpl im
       // the container instead.
       ta.maybeSendSpeculatorContainerRequest();
 
+      // TODO XXX: Creating the remote task here may not be required in case of recovery.
+      // Could be solved by having the Scheduler pull the RemoteTask.
+      
       // Create the remote task.
       org.apache.hadoop.mapred.Task remoteTask = ta.createRemoteTask();
       // Create startTaskRequest
@@ -896,7 +919,7 @@ public abstract class TaskAttemptImpl im
       // TODO jvmId only required if TAL registration happens here.
       // TODO Anything to be done with the TaskAttemptListener ? or is that in
       // the Container.
-
+ 
       ta.launchTime = ta.clock.getTime();
       ta.shufflePort = event.getShufflePort();
 
@@ -1136,6 +1159,10 @@ public abstract class TaskAttemptImpl im
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       super.transition(ta, event);
       ta.sendTaskAttemptCleanupEvent();
+      // XXX: This may need some additional handling.
+      // TaskAttempt failed after SUCCESS -> the container should also be STOPPED if it's still RUNNING.
+      // Inform the Scheduler.
+      // XXX: Also maybe change the ougoing Task Attempt to be FETCH_FAILURE related.
       // TODO XXX: Any counter updates. 
     }
   }
@@ -1171,6 +1198,16 @@ public abstract class TaskAttemptImpl im
       super.transition(ta, event);
     }
   }
+  
+  private void initTaskAttemptStatus(TaskAttemptStatus result) {
+    result.progress = 0.0f;
+    result.phase = Phase.STARTING;
+    result.stateString = "NEW";
+    result.taskState = TaskAttemptState.NEW;
+    Counters counters = EMPTY_COUNTERS;
+    //    counters.groups = new HashMap<String, CounterGroup>();
+    result.counters = counters;
+  }
 
   // TODO Consolidate all the Failed / Killed methods which only differ in state.
   // Move some of the functionality out to helpers, instead of extending non-related event classes.

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/job/impl/TaskImpl.java Fri Sep 14 00:41:00 2012
@@ -859,12 +859,12 @@ public abstract class TaskImpl implement
       if (attempt.getAssignedContainerMgrAddress() != null) {
         //container was assigned
         // TOOD XXX: What else changes other than this one transition.
-        // TODO XXX: Get rid of all the old events, types, etc temporarily.
         
         // This can originate from TOO_MANY_FETCH_FAILURES -> the Container may still be running. Ask the scheduler to KILL it.
-        // TODO XXX ZZZ: Send out a TA_STOP_REQUEST. or the Task sends this out directly, considering the TaskAttempt may already have completed.
+        // TODO XXX: Send out a TA_STOP_REQUEST. or the Task sends this out directly, considering the TaskAttempt may already have completed.
 //        task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), 
 //            attempt.getAssignedContainerMgrAddress()));
+        // TODO XXX: This is not required here. TaskAttempt should be sending out the STOP_REQUEST
         task.eventHandler.handle(new AMSchedulerTAStopRequestEvent(castEvent.getTaskAttemptID(), true));
       }
       

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Fri Sep 14 00:41:00 2012
@@ -1,447 +1,602 @@
-///**
-//* Licensed to the Apache Software Foundation (ASF) under one
-//* or more contributor license agreements.  See the NOTICE file
-//* distributed with this work for additional information
-//* regarding copyright ownership.  The ASF licenses this file
-//* to you under the Apache License, Version 2.0 (the
-//* "License"); you may not use this file except in compliance
-//* with the License.  You may obtain a copy of the License at
-//*
-//*     http://www.apache.org/licenses/LICENSE-2.0
-//*
-//* Unless required by applicable law or agreed to in writing, software
-//* distributed under the License is distributed on an "AS IS" BASIS,
-//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//* See the License for the specific language governing permissions and
-//* limitations under the License.
-//*/
-//
-//package org.apache.hadoop.mapreduce.v2.app2.recover;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.LinkedList;
-//import java.util.List;
-//import java.util.Map;
-//
-//import org.apache.commons.logging.Log;
-//import org.apache.commons.logging.LogFactory;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.fs.FSDataInputStream;
-//import org.apache.hadoop.fs.FileContext;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.mapreduce.MRJobConfig;
-//import org.apache.hadoop.mapreduce.OutputCommitter;
-//import org.apache.hadoop.mapreduce.TaskAttemptContext;
-//import org.apache.hadoop.mapreduce.TaskAttemptID;
-//import org.apache.hadoop.mapreduce.TaskType;
-//import org.apache.hadoop.mapreduce.TypeConverter;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
-//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-//import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-//import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-//import org.apache.hadoop.mapreduce.v2.api.records.Phase;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-//import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
-//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
-//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerRemoteLaunchEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
-//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
-//import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
-//import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
-//import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
-//import org.apache.hadoop.yarn.Clock;
-//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-//import org.apache.hadoop.yarn.api.records.Container;
-//import org.apache.hadoop.yarn.api.records.ContainerId;
-//import org.apache.hadoop.yarn.api.records.NodeId;
-//import org.apache.hadoop.yarn.event.AsyncDispatcher;
-//import org.apache.hadoop.yarn.event.Dispatcher;
-//import org.apache.hadoop.yarn.event.Event;
-//import org.apache.hadoop.yarn.event.EventHandler;
-//import org.apache.hadoop.yarn.service.CompositeService;
-//import org.apache.hadoop.yarn.service.Service;
-//import org.apache.hadoop.yarn.util.BuilderUtils;
-//import org.apache.hadoop.yarn.util.ConverterUtils;
-//
-///*
-// * Recovers the completed tasks from the previous life of Application Master.
-// * The completed tasks are deciphered from the history file of the previous life.
-// * Recovery service intercepts and replay the events for completed tasks.
-// * While recovery is in progress, the scheduling of new tasks are delayed by 
-// * buffering the task schedule events.
-// * The recovery service controls the clock while recovery is in progress.
-// */
-//
-////TODO:
-////task cleanup for all non completed tasks
-//public class RecoveryService extends CompositeService implements Recovery {
-//
-//  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
-//
-//  private final ApplicationAttemptId applicationAttemptId;
-//  private final OutputCommitter committer;
-//  private final Dispatcher dispatcher;
-//  private final ControlledClock clock;
-//
-//  private JobInfo jobInfo = null;
-//  private final Map<TaskId, TaskInfo> completedTasks =
-//    new HashMap<TaskId, TaskInfo>();
-//
-//  private final List<TaskEvent> pendingTaskScheduleEvents =
-//    new ArrayList<TaskEvent>();
-//
-//  private volatile boolean recoveryMode = false;
-//
-//  public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-//      Clock clock, OutputCommitter committer) {
-//    super("RecoveringDispatcher");
-//    this.applicationAttemptId = applicationAttemptId;
-//    this.committer = committer;
-//    this.dispatcher = createRecoveryDispatcher();
-//    this.clock = new ControlledClock(clock);
-//      addService((Service) dispatcher);
-//  }
-//
-//  @Override
-//  public void init(Configuration conf) {
-//    super.init(conf);
-//    // parse the history file
-//    try {
-//      parse();
-//    } catch (Exception e) {
-//      LOG.warn(e);
-//      LOG.warn("Could not parse the old history file. Aborting recovery. "
-//          + "Starting afresh.", e);
-//    }
-//    if (completedTasks.size() > 0) {
-//      recoveryMode = true;
-//      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
-//          + "TO RECOVER " + completedTasks.size());
-//      LOG.info("Job launch time " + jobInfo.getLaunchTime());
-//      clock.setTime(jobInfo.getLaunchTime());
-//    }
-//  }
-//
-//  @Override
-//  public Dispatcher getDispatcher() {
-//    return dispatcher;
-//  }
-//
-//  @Override
-//  public Clock getClock() {
-//    return clock;
-//  }
-//
-//  @Override
-//  public Map<TaskId, TaskInfo> getCompletedTasks() {
-//    return completedTasks;
-//  }
-//
-//  @Override
-//  public List<AMInfo> getAMInfos() {
-//    if (jobInfo == null || jobInfo.getAMInfos() == null) {
-//      return new LinkedList<AMInfo>();
-//    }
-//    List<AMInfo> amInfos = new LinkedList<AMInfo>();
-//    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
-//        .getAMInfos()) {
-//      AMInfo amInfo =
-//          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
-//              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
-//              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
-//              jhAmInfo.getNodeManagerHttpPort());
-//
-//      amInfos.add(amInfo);
-//    }
-//    return amInfos;
-//  }
-//
-//  private void parse() throws IOException {
-//    // TODO: parse history file based on startCount
-//    String jobName = 
-//        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
-//    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
-//    FSDataInputStream in = null;
-//    Path historyFile = null;
-//    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
-//        new Path(jobhistoryDir));
-//    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
-//        getConfig());
-//    //read the previous history file
-//    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-//        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
-//    LOG.info("History file is at " + historyFile);
-//    in = fc.open(historyFile);
-//    JobHistoryParser parser = new JobHistoryParser(in);
-//    jobInfo = parser.parse();
-//    Exception parseException = parser.getParseException();
-//    if (parseException != null) {
-//      LOG.info("Got an error parsing job-history file " + historyFile + 
-//          ", ignoring incomplete events.", parseException);
-//    }
-//    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
-//        .getAllTasks();
-//    for (TaskInfo taskInfo : taskInfos.values()) {
-//      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
-//        completedTasks
-//            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
-//        LOG.info("Read from history task "
-//            + TypeConverter.toYarn(taskInfo.getTaskId()));
-//      }
-//    }
-//    LOG.info("Read completed tasks from history "
-//        + completedTasks.size());
-//  }
-//  
-//  protected Dispatcher createRecoveryDispatcher() {
-//    return new RecoveryDispatcher();
-//  }
-//
-//  @SuppressWarnings("rawtypes")
-//  class RecoveryDispatcher extends AsyncDispatcher {
-//    private final EventHandler actualHandler;
-//    private final EventHandler handler;
-//
-//    RecoveryDispatcher() {
-//      super();
-//      actualHandler = super.getEventHandler();
-//      handler = new InterceptingEventHandler(actualHandler);
-//    }
-//
-//    @Override
-//    @SuppressWarnings("unchecked")
-//    public void dispatch(Event event) {
-//      if (recoveryMode) {
-//        if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
-//          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
-//              .getTaskAttemptID());
-//          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
-//          clock.setTime(attInfo.getStartTime());
-//
-//        } else if (event.getType() == TaskAttemptEventType.TA_DONE
-//            || event.getType() == TaskAttemptEventType.TA_FAILMSG
-//            || event.getType() == TaskAttemptEventType.TA_KILL) {
-//          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
-//              .getTaskAttemptID());
-//          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
-//          clock.setTime(attInfo.getFinishTime());
-//        }
-//
-//        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
-//            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
-//            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
-//          TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
-//          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
-//          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
-//              .getTaskId());
-//          taskInfo.getAllTaskAttempts().remove(
-//              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
-//          // remove the task info from completed tasks if all attempts are
-//          // recovered
-//          if (taskInfo.getAllTaskAttempts().size() == 0) {
-//            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
-//            // checkForRecoveryComplete
-//            LOG.info("CompletedTasks() " + completedTasks.size());
-//            if (completedTasks.size() == 0) {
-//              recoveryMode = false;
-//              clock.reset();
-//              LOG.info("Setting the recovery mode to false. " +
-//                 "Recovery is complete!");
-//
-//              // send all pending tasks schedule events
-//              for (TaskEvent tEv : pendingTaskScheduleEvents) {
-//                actualHandler.handle(tEv);
-//              }
-//
-//            }
-//          }
-//        }
-//      }
-//      realDispatch(event);
-//    }
-//    
-//    public void realDispatch(Event event) {
-//      super.dispatch(event);
-//    }
-//
-//    @Override
-//    public EventHandler getEventHandler() {
-//      return handler;
-//    }
-//  }
-//
-//  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
-//    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
-//    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
-//  }
-//
-//  @SuppressWarnings({"rawtypes", "unchecked"})
-//  private class InterceptingEventHandler implements EventHandler {
-//    EventHandler actualHandler;
-//
-//    InterceptingEventHandler(EventHandler actualHandler) {
-//      this.actualHandler = actualHandler;
-//    }
-//
-//    @Override
-//    public void handle(Event event) {
-//      if (!recoveryMode) {
-//        // delegate to the dispatcher one
-//        actualHandler.handle(event);
-//        return;
-//      }
-//
-//      else if (event.getType() == TaskEventType.T_SCHEDULE) {
-//        TaskEvent taskEvent = (TaskEvent) event;
-//        // delay the scheduling of new tasks till previous ones are recovered
-//        if (completedTasks.get(taskEvent.getTaskID()) == null) {
-//          LOG.debug("Adding to pending task events "
-//              + taskEvent.getTaskID());
-//          pendingTaskScheduleEvents.add(taskEvent);
-//          return;
-//        }
-//      }
-//
-//      else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
-//        TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
-//        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-//        LOG.debug("CONTAINER_REQ " + aId);
-//        sendAssignedEvent(aId, attInfo);
-//        return;
-//      }
-//
-//      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
-//        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
-//        LOG.debug("TASK_CLEAN");
-//        actualHandler.handle(new TaskAttemptEvent(aId,
-//            TaskAttemptEventType.TA_CLEANUP_DONE));
-//        return;
-//      }
-//
-//      else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
-//        TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
-//            .getTaskAttemptID();
-//        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-//        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
-//            attInfo.getShufflePort()));
-//        // send the status update event
-//        sendStatusUpdateEvent(aId, attInfo);
-//
-//        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
-//        switch (state) {
-//        case SUCCEEDED:
-//          //recover the task output
-//          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
-//              attInfo.getAttemptId());
-//          try { 
-//            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
-//            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
-//            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
-//              committer.recoverTask(taskContext);
-//              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
-//            } else {
-//              LOG.info("Will not try to recover output for "
-//                  + taskContext.getTaskAttemptID());
-//            }
-//          } catch (IOException e) {
-//            LOG.error("Caught an exception while trying to recover task "+aId, e);
-//            actualHandler.handle(new JobDiagnosticsUpdateEvent(
-//                aId.getTaskId().getJobId(), "Error in recovering task output " + 
-//                e.getMessage()));
-//            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
-//                JobEventType.INTERNAL_ERROR));
-//          }
-//          
-//          // send the done event
-//          LOG.info("Sending done event to recovered attempt " + aId);
-//          actualHandler.handle(new TaskAttemptEvent(aId,
-//              TaskAttemptEventType.TA_DONE));
-//          break;
-//        case KILLED:
-//          LOG.info("Sending kill event to recovered attempt " + aId);
-//          actualHandler.handle(new TaskAttemptEvent(aId,
-//              TaskAttemptEventType.TA_KILL));
-//          break;
-//        default:
-//          LOG.info("Sending fail event to recovered attempt " + aId);
-//          actualHandler.handle(new TaskAttemptEvent(aId,
-//              TaskAttemptEventType.TA_FAILMSG));
-//          break;
-//        }
-//        return;
-//      }
-//
-//      else if (event.getType() == 
-//        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
-//        TaskAttemptId aId = ((ContainerLauncherEvent) event)
-//          .getTaskAttemptID();
-//        actualHandler.handle(
-//           new TaskAttemptEvent(aId,
-//                TaskAttemptEventType.TA_CONTAINER_CLEANED));
-//        return;
-//      }
-//
-//      // delegate to the actual handler
-//      actualHandler.handle(event);
-//    }
-//
-//    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
-//        TaskAttemptInfo attemptInfo) {
-//      LOG.info("Sending status update event to " + yarnAttemptID);
-//      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
-//      taskAttemptStatus.id = yarnAttemptID;
-//      taskAttemptStatus.progress = 1.0f;
-//      taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); 
-//      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
-//      taskAttemptStatus.phase = Phase.CLEANUP;
-//      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
-//      if (cntrs == null) {
-//        taskAttemptStatus.counters = null;
-//      } else {
-//        taskAttemptStatus.counters = cntrs;
-//      }
-//      actualHandler.handle(new TaskAttemptStatusUpdateEvent(
-//          taskAttemptStatus.id, taskAttemptStatus));
-//    }
-//
-//    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
-//        TaskAttemptInfo attemptInfo) {
-//      LOG.info("Sending assigned event to " + yarnAttemptID);
-//      ContainerId cId = attemptInfo.getContainerId();
-//
-//      NodeId nodeId =
-//          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
-//              + attemptInfo.getPort());
-//      // Resource/Priority/ApplicationACLs are only needed while launching the
-//      // container on an NM, these are already completed tasks, so setting them
-//      // to null
-//      Container container = BuilderUtils.newContainer(cId, nodeId,
-//          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
-//          null, null, null);
-//      actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
-//          container, null));
-//    }
-//  }
-//
-//}
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerTASucceededEvent;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/*
+ * Recovers the completed tasks from the previous life of Application Master.
+ * The completed tasks are deciphered from the history file of the previous life.
+ * Recovery service intercepts and replay the events for completed tasks.
+ * While recovery is in progress, the scheduling of new tasks are delayed by 
+ * buffering the task schedule events.
+ * The recovery service controls the clock while recovery is in progress.
+ */
+
+//TODO:
+//task cleanup for all non completed tasks
+public class RecoveryService extends CompositeService implements Recovery {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final OutputCommitter committer;
+  private final Dispatcher dispatcher;
+  private final ControlledClock clock;
+  private final AppContext appContext;
+
+  private JobInfo jobInfo = null;
+  private final Map<TaskId, TaskInfo> completedTasks =
+    new HashMap<TaskId, TaskInfo>();
+
+  private final List<TaskEvent> pendingTaskScheduleEvents =
+    new ArrayList<TaskEvent>();
+  private Map<ContainerId, ContainerInfo> containerInfo =
+      new HashMap<ContainerId, ContainerInfo>();
+  private Map<TaskAttemptId, ContainerId> attemptToContainerMap =
+      new HashMap<TaskAttemptId, ContainerId>();
+
+  private volatile boolean recoveryMode = false;
+
+  public RecoveryService(AppContext appContext, OutputCommitter committer) {
+    super("RecoveringDispatcher");
+    this.appContext = appContext;
+    this.applicationAttemptId = appContext.getApplicationAttemptId();
+    this.committer = committer;
+    this.dispatcher = createRecoveryDispatcher();
+    this.clock = new ControlledClock(appContext.getClock());
+    addService((Service) dispatcher);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    // parse the history file
+    try {
+      parse();
+    } catch (Exception e) {
+      LOG.warn(e);
+      LOG.warn("Could not parse the old history file. Aborting recovery. "
+          + "Starting afresh.", e);
+    }
+    if (completedTasks.size() > 0) {
+      recoveryMode = true;
+      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+          + "TO RECOVER " + completedTasks.size());
+      LOG.info("Job launch time " + jobInfo.getLaunchTime());
+      clock.setTime(jobInfo.getLaunchTime());
+    }
+  }
+
+  @Override
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  @Override
+  public Clock getClock() {
+    return clock;
+  }
+
+  @Override
+  public Map<TaskId, TaskInfo> getCompletedTasks() {
+    return completedTasks;
+  }
+
+  @Override
+  public List<AMInfo> getAMInfos() {
+    if (jobInfo == null || jobInfo.getAMInfos() == null) {
+      return new LinkedList<AMInfo>();
+    }
+    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+        .getAMInfos()) {
+      AMInfo amInfo =
+          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+              jhAmInfo.getNodeManagerHttpPort());
+
+      amInfos.add(amInfo);
+    }
+    return amInfos;
+  }
+
+  private void parse() throws IOException {
+    // TODO: parse history file based on startCount
+    String jobName = 
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
+    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+        new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+        getConfig());
+    //read the previous history file
+    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
+    LOG.info("History file is at " + historyFile);
+    in = fc.open(historyFile);
+    JobHistoryParser parser = new JobHistoryParser(in);
+    jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file " + historyFile + 
+          ", ignoring incomplete events.", parseException);
+    }
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        completedTasks
+            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+        LOG.info("Read from history task "
+            + TypeConverter.toYarn(taskInfo.getTaskId()));
+      }
+    }
+    LOG.info("Read completed tasks from history "
+        + completedTasks.size());
+  }
+  
+  protected Dispatcher createRecoveryDispatcher() {
+    return new RecoveryDispatcher();
+  }
+
+  @SuppressWarnings("rawtypes")
+  class RecoveryDispatcher extends AsyncDispatcher {
+    // Intercept events when they're being drained from the queue - so oreder is considered.
+    private final EventHandler actualHandler;
+    private final EventHandler handler;
+
+    RecoveryDispatcher() {
+      super();
+      actualHandler = super.getEventHandler();
+      handler = new InterceptingEventHandler(actualHandler);
+    }
+
+    
+    @Override
+    public void dispatch(Event event) {
+      if (recoveryMode) {
+        if (event.getType() == TaskAttemptEventType.TA_STARTED_REMOTELY) {
+          // These events are split between the intercepted handle() call, and
+          // just before the dispatch.
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
+          clock.setTime(attInfo.getStartTime());
+        } else if (event.getType() == TaskAttemptEventType.TA_DONE
+            || event.getType() == TaskAttemptEventType.TA_FAIL_REQUEST
+            || event.getType() == TaskAttemptEventType.TA_KILL_REQUEST) {
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
+          clock.setTime(attInfo.getFinishTime());
+        }
+
+        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+          TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
+          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+              .getTaskId());
+          taskInfo.getAllTaskAttempts().remove(
+              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+          // remove the task info from completed tasks if all attempts are
+          // recovered
+          if (taskInfo.getAllTaskAttempts().size() == 0) {
+            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+            // checkForRecoveryComplete
+            LOG.info("CompletedTasks() " + completedTasks.size());
+            if (allTasksRecovered()) {
+              if (!allContainersStopped()) {
+                stopRemainingContainers(actualHandler);
+              } else {
+                endRecovery(actualHandler);
+              }
+            }
+          }
+        } else if (event.getType() == AMSchedulerEventType.S_CONTAINER_COMPLETED) {
+          // This is the last event after a container completes. TA_TERMINATED
+          // messages to tasks would have gone out, and been processed before
+          // this. As a result, TASK_CLEAN generated by TA_TERMINATED would
+          // reach the InterceptingEventHandler (and are ignored) before this
+          // event type is dispatched.
+          // At this point, it's safe to remove this container from the
+          // containerInfo map.
+          AMSchedulerEventContainerCompleted cEvent = (AMSchedulerEventContainerCompleted)event;
+          ContainerId containerId = cEvent.getContainerId();
+          LOG.info("In Recovery, Container with id: " + containerId + " completed");
+          containerInfo.remove(containerId);
+
+          // Check if recovery is complete.
+          if (allTasksRecovered() && allContainersStopped()) {
+            endRecovery(actualHandler);
+          }
+          return; //S_CONTAINER_COMPELTED does not need to reach the scheduler.
+        }
+      }
+      realDispatch(event);
+    }
+    
+    public void realDispatch(Event event) {
+      super.dispatch(event);
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return handler;
+    }
+  }
+
+  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+  }
+
+  protected boolean allContainersStopped() {
+    return containerInfo.size() == 0;
+  }
+
+  protected boolean allTasksRecovered() {
+    return completedTasks.size() == 0;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected void stopRemainingContainers(EventHandler eventHandler) {
+    for (ContainerId containerId : containerInfo.keySet()) {
+      eventHandler.handle(new AMContainerEvent(containerId,
+          AMContainerEventType.C_STOP_REQUEST));
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  protected void endRecovery(EventHandler eventHandler) {
+    recoveryMode = false;
+    clock.reset();
+    LOG.info("Setting the recovery mode to false. " + "Recovery is complete!");
+
+    // send all pending tasks schedule events
+    for (TaskEvent tEv : pendingTaskScheduleEvents) {
+      eventHandler.handle(tEv);
+    }
+  }
+
+  
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private class InterceptingEventHandler implements EventHandler {
+    //Intercept events before they're put onto the queue.
+    EventHandler actualHandler;
+
+    InterceptingEventHandler(EventHandler actualHandler) {
+      this.actualHandler = actualHandler;
+    }
+
+    @Override
+    public void handle(Event event) {
+      if (!recoveryMode) {
+        // delegate to the dispatcher one
+        actualHandler.handle(event);
+        return;
+      }
+
+      // Schedule previous finished attempts. Delay new ones till after recovery.
+      else if (event.getType() == TaskEventType.T_SCHEDULE) {
+        TaskEvent taskEvent = (TaskEvent) event;
+        // delay the scheduling of new tasks till previous ones are recovered
+        if (completedTasks.get(taskEvent.getTaskID()) == null) {
+          LOG.debug("Adding to pending task events "
+              + taskEvent.getTaskID());
+          pendingTaskScheduleEvents.add(taskEvent);
+          return;
+        }
+      }
+
+      // Intercept TaskAttempt start request.
+      else if (event.getType() == AMSchedulerEventType.S_TA_LAUNCH_REQUEST) {
+        TaskAttemptId aId = ((AMSchedulerTALaunchRequestEvent) event).getAttemptID();
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        LOG.debug("TA_LAUNCH_REQUEST " + aId);
+        sendAssignedEvent(aId, attInfo, (AMSchedulerTALaunchRequestEvent)event);
+        return;
+      }
+
+      // Container Launch request. Mock and send back launched.
+      else if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+        ContainerId cId = ((NMCommunicatorLaunchRequestEvent) event)
+            .getContainerId();
+        // Simulate container launch.
+        ContainerInfo cInfo = containerInfo.get(cId);
+        actualHandler.handle(new AMContainerEventLaunched(cId, cInfo
+            .getShufflePort()));
+
+        // Simulate a pull from the TaskAttempt
+        actualHandler.handle(new AMContainerEvent(cId,
+            AMContainerEventType.C_PULL_TA));
+
+        // Inform the TaskAttempt about the assignment.
+        actualHandler.handle(new TaskAttemptRemoteStartEvent(cInfo
+            .getNextAttemptId(), cId, null, cInfo.getShufflePort()));
+        
+        // TaskAttempt doesn't generate any useful event while in RUNNING. Generate events for next state here.
+        TaskAttemptId aId = cInfo.getNextAttemptId();
+        
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        // send the status update event
+        sendStatusUpdateEvent(aId, attInfo);
+
+        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
+        switch (state) {
+        case SUCCEEDED:
+          //recover the task output
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+              attInfo.getAttemptId());
+          try { 
+            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
+            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+              committer.recoverTask(taskContext);
+              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+            } else {
+              LOG.info("Will not try to recover output for "
+                  + taskContext.getTaskAttemptID());
+            }
+          } catch (IOException e) {
+            LOG.error("Caught an exception while trying to recover task "+aId, e);
+            actualHandler.handle(new JobDiagnosticsUpdateEvent(
+                aId.getTaskId().getJobId(), "Error in recovering task output " + 
+                e.getMessage()));
+            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+                JobEventType.INTERNAL_ERROR));
+          }
+          
+          // send the done event
+          LOG.info("Sending done event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEvent(aId,
+              TaskAttemptEventType.TA_DONE));
+          // XXX (Post-3902)thh.unregister happens here. Ensure THH handles it
+          // correctly in case of recovery. 
+          break;
+        case KILLED:
+          LOG.info("Sending kill event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEventKillRequest(aId, "")); 
+          break;
+        default:
+          LOG.info("Sending fail event to recovered attempt " + aId);
+          actualHandler.handle(new TaskAttemptEventFailRequest(aId, ""));
+          break;
+        }
+        return;
+      } 
+      
+      // Handle Events which may be sent to the scheduler.
+      else if (event.getType() == AMSchedulerEventType.S_TA_SUCCEEDED) {
+        // Inform the container that the task attempt succeeded.
+        AMSchedulerTASucceededEvent sEvent = (AMSchedulerTASucceededEvent)event;
+        
+        // Leaving the event in the map - for TA failure after success.
+        ContainerId containerId = attemptToContainerMap.get(sEvent.getAttemptID());
+        actualHandler.handle(new AMContainerTASucceededEvent(containerId,
+            sEvent.getAttemptID()));
+        return;
+        // XXX (Post-3902)tal.unregister happens here. Ensure THH handles it
+        // correctly in case of recovery.
+      }
+      else if (event.getType() == AMSchedulerEventType.S_TA_STOP_REQUEST) {
+        // Tell the container to stop.
+        AMSchedulerTAStopRequestEvent sEvent = (AMSchedulerTAStopRequestEvent) event;
+        ContainerId containerId = attemptToContainerMap.get(sEvent.getAttemptID());
+        actualHandler.handle(new AMContainerEvent(containerId,
+            AMContainerEventType.C_STOP_REQUEST));
+        return;
+        // XXX (Post-3902)chh.unregister happens here. Ensure THH handles it
+        // correctly in case of recovery.
+      }
+      
+      // Ignore de-allocate requests for the container.
+      else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
+        // Ignore. Unless we start relying on a successful NM.stopContainer() call.
+        return;
+      }
+      
+      // De-allocate containers used by previous attempts immediately.
+      else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
+        RMCommunicatorContainerDeAllocateRequestEvent dEvent = (RMCommunicatorContainerDeAllocateRequestEvent) event;
+        ContainerId cId = dEvent.getContainerId();
+        // exitStatus not known, diagnostics not known.
+        ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+            ContainerState.COMPLETE, "", 0);
+        actualHandler.handle(new AMContainerEventCompleted(cs));
+        return;
+      }
+      
+      // Received for FAILED/KILLED tasks after C_COMPLETED.
+      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+        LOG.debug("TASK_CLEAN for attemptId: " + aId);
+        return;
+      }
+
+
+      // delegate to the actual handler
+      actualHandler.handle(event);
+    }
+
+    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo) {
+      LOG.info("Sending status update event to " + yarnAttemptID);
+      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+      taskAttemptStatus.id = yarnAttemptID;
+      taskAttemptStatus.progress = 1.0f;
+      taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
+      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+      taskAttemptStatus.phase = Phase.CLEANUP;
+      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+      if (cntrs == null) {
+        taskAttemptStatus.counters = null;
+      } else {
+        taskAttemptStatus.counters = cntrs;
+      }
+      actualHandler.handle(new TaskAttemptStatusUpdateEvent(
+          taskAttemptStatus.id, taskAttemptStatus));
+    }
+
+    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo, AMSchedulerTALaunchRequestEvent event) {
+      LOG.info("Sending assigned event to " + yarnAttemptID);
+      ContainerId cId = attemptInfo.getContainerId();
+
+      NodeId nodeId =
+          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+              + attemptInfo.getPort());
+      // Resource/Priority/ApplicationACLs are only needed while launching the
+      // container on an NM, these are already completed tasks, so setting them
+      // to null
+      Container container = BuilderUtils.newContainer(cId, nodeId,
+          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+          null, null, null);
+      
+      // Track shufflePort, attemptId for container - would normally be done by the scheduler.
+      ContainerInfo cInfo = containerInfo.get(cId);
+      if (cInfo == null) {
+        cInfo = new ContainerInfo(attemptInfo.getShufflePort());
+        containerInfo.put(cId, cInfo);
+      }
+      cInfo.setAttemptId(yarnAttemptID);
+      attemptToContainerMap.put(yarnAttemptID, cId);
+      
+      appContext.getAllNodes().nodeSeen(nodeId);
+      appContext.getAllContainers().addContainerIfNew(container);
+      
+      
+      // Request container launch for new containers.
+      if (appContext.getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+        actualHandler.handle(new AMContainerLaunchRequestEvent(cId, event,
+            null, yarnAttemptID.getTaskId().getJobId()));
+      }
+      // Assing the task attempt to this container.
+      actualHandler.handle(new AMContainerAssignTAEvent(cId, yarnAttemptID,
+          event.getRemoteTask()));
+    }
+    // TODO: Handle container launch request
+  }
+
+  private static class ContainerInfo {
+    int shufflePort;
+    TaskAttemptId nextAttemptId;
+
+    ContainerInfo(int shufflePort) {
+      this.shufflePort = shufflePort;
+    }
+
+    void setAttemptId(TaskAttemptId attemptId) {
+      this.nextAttemptId = attemptId;
+    }
+    
+    int getShufflePort() {
+      return shufflePort;
+    }
+    
+    TaskAttemptId getNextAttemptId() {
+      return nextAttemptId;
+    }
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java?rev=1384614&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java Fri Sep 14 00:41:00 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainerCompleted extends AMSchedulerEvent {
+
+  private final ContainerId containerId;
+  
+  public AMSchedulerEventContainerCompleted(ContainerId containerId) {
+    super(AMSchedulerEventType.S_CONTAINER_COMPLETED);;
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Sep 14 00:41:00 2012
@@ -396,6 +396,8 @@ public class RMContainerAllocator extend
   private void handleTaStopRequest(AMSchedulerTAStopRequestEvent event) {
     TaskAttemptId aId = event.getAttemptID();
     attemptToLaunchRequestMap.remove(aId);
+    // TODO XXX: This remove may need to be deferred. Possible for a SUCCESSFUL taskAttempt to fail,
+    // which means the scheduler needs to remember taskAttempt to container assignments for a longer time.
     boolean removed = pendingReduces.remove(aId);
     if (!removed) {
       removed = scheduledRequests.remove(aId);
@@ -414,12 +416,17 @@ public class RMContainerAllocator extend
         } else {
           LOG.warn("Received a STOP request for absent taskAttempt: "
               + event.getAttemptID());
+          // This could be generated in case of recovery, with unhealthy nodes/
+          // fetch failures. Can be ignored, since Recovered containers don't
+          // need to be stopped.
         }
       }
     }
   }
   
   private void handleTaSucceededRequest(AMSchedulerTASucceededEvent event) {
+    // TODO XXX Remember the assigned containerId even after task success.
+    // Required for TOO_MANY_FETCH_FAILURES
     attemptToLaunchRequestMap.remove(event.getAttemptID());
     ContainerId containerId = assignedRequests.remove(event.getAttemptID());
     if (containerId != null) { // TODO Should not be null. Confirm.

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Fri Sep 14 00:41:00 2012
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
-import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventReleased;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventStateChanged;
 import org.apache.hadoop.yarn.Clock;
@@ -339,7 +339,7 @@ public class RMContainerRequestor extend
     
     // Inform the Containers about completion..
     for (ContainerStatus c : finishedContainers) {
-      eventHandler.handle(new AMContainerEventReleased(c));
+      eventHandler.handle(new AMContainerEventCompleted(c));
     }
    
     // Inform the scheduler about new containers.
@@ -347,7 +347,7 @@ public class RMContainerRequestor extend
     if (newContainers.size() > 0) {
       newContainerIds = new ArrayList<ContainerId>(newContainers.size());
       for (Container container : newContainers) {
-        context.getAllContainers().addNewContainer(container);
+        context.getAllContainers().addContainerIfNew(container);
         newContainerIds.add(container.getId()); 
         context.getAllNodes().nodeSeen(container.getNodeId());
       }

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java?rev=1384614&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEventCompleted.java Fri Sep 14 00:41:00 2012
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class AMContainerEventCompleted extends AMContainerEvent {
+
+  private final ContainerStatus containerStatus;
+
+  public AMContainerEventCompleted(ContainerStatus containerStatus) {
+    super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
+    this.containerStatus = containerStatus;
+  }
+
+  public ContainerStatus getContainerStatus() {
+    return this.containerStatus;
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerImpl.java Fri Sep 14 00:41:00 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.container;
 
 import java.io.IOException;
@@ -39,7 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventTerminated;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContaienrCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
 import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorStopRequestEvent;
@@ -346,8 +364,7 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendEvent(new TaskAttemptEventTerminated(event
-          .getTaskAttemptId()));
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
       container.sendCompletedToScheduler();
       container.deAllocate();
       LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId()
@@ -403,10 +420,10 @@ public class AMContainerImpl implements 
   }
   
   protected void sendCompletedToScheduler() {
-    sendEvent(new AMSchedulerEventContaienrCompleted(containerId));
+    sendEvent(new AMSchedulerEventContainerCompleted(containerId));
   }
   
-  protected void sendCompletedToTaskAttempt(TaskAttemptId taId) {
+  protected void sendTerminatedToTaskAttempt(TaskAttemptId taId) {
     sendEvent(new TaskAttemptEventTerminated(taId));
   }
 
@@ -445,7 +462,7 @@ public class AMContainerImpl implements 
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       if (container.pendingAttempt != null) {
         container.inError = true;
-        container.sendCompletedToTaskAttempt(event.getTaskAttemptId());
+        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
         container.deAllocate();
         return AMContainerState.STOPPING;
       }
@@ -534,7 +551,7 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
-        container.sendCompletedToTaskAttempt(container.pendingAttempt);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
         // TODO XXX Maybe nullify pendingAttempt.
       }
       container.sendCompletedToScheduler();
@@ -588,7 +605,7 @@ public class AMContainerImpl implements 
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       if (container.pendingAttempt != null) {
         container.inError = true;
-        container.sendCompletedToTaskAttempt(event.getTaskAttemptId());
+        container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
         container.sendStopRequestToNM();
         container.deAllocate();
         container.containerHeartbeatHandler.unregister(container.containerId);
@@ -614,8 +631,7 @@ public class AMContainerImpl implements 
       LOG.info("Cotnainer with id: " + container.getContainerId()
           + " Completed." + " Previous state was: " + container.getState());
       if (container.pendingAttempt != null) {
-        container.sendEvent(new TaskAttemptEvent(container.pendingAttempt,
-            TaskAttemptEventType.TA_TERMINATED));
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
       }
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
@@ -673,7 +689,7 @@ public class AMContainerImpl implements 
       SingleArcTransition<AMContainerImpl, AMContainerEvent> {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
-      container.sendCompletedToTaskAttempt(container.runningAttempt);
+      container.sendTerminatedToTaskAttempt(container.runningAttempt);
       container.sendCompletedToScheduler();
       container.containerHeartbeatHandler.unregister(container.containerId);
       container.unregisterAttemptFromListener(container.runningAttempt);
@@ -694,7 +710,7 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       super.transition(container, cEvent);
       container.unregisterAttemptFromListener(container.runningAttempt);
-      container.unregisterJvmFromListener(container.jvmId);
+//      container.unregisterJvmFromListener(container.jvmId);
       // TODO XXX: All running transition. verify whether runningAttempt should be null.
       container.interruptedEvent = container.runningAttempt;
       container.runningAttempt = null;
@@ -740,7 +756,7 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendCompletedToTaskAttempt(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
     }
   }
 
@@ -768,7 +784,7 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
-      container.sendCompletedToTaskAttempt(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
     }
   }
   
@@ -782,13 +798,13 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       if (container.pendingAttempt != null) {
-        container.sendCompletedToTaskAttempt(container.pendingAttempt);
+        container.sendTerminatedToTaskAttempt(container.pendingAttempt);
       }
       if (container.runningAttempt != null) {
-        container.sendCompletedToTaskAttempt(container.runningAttempt);
+        container.sendTerminatedToTaskAttempt(container.runningAttempt);
       }
       if (container.interruptedEvent != null) {
-        container.sendCompletedToTaskAttempt(container.interruptedEvent);
+        container.sendTerminatedToTaskAttempt(container.interruptedEvent);
       }
       container.sendCompletedToScheduler();
     }
@@ -860,7 +876,7 @@ public class AMContainerImpl implements 
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerAssignTAEvent event = (AMContainerAssignTAEvent) cEvent;
       container.inError = true;
-      container.sendCompletedToTaskAttempt(event.getTaskAttemptId());
+      container.sendTerminatedToTaskAttempt(event.getTaskAttemptId());
       container.sendStopRequestToNM();
       container.deAllocate();
       container.unregisterAttemptFromListener(container.runningAttempt);

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java?rev=1384614&r1=1384613&r2=1384614&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerMap.java Fri Sep 14 00:41:00 2012
@@ -3,6 +3,8 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
@@ -14,6 +16,8 @@ import org.apache.hadoop.yarn.service.Ab
 public class AMContainerMap extends AbstractService
     implements EventHandler<AMContainerEvent> {
 
+  private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
+  
   private final ContainerHeartbeatHandler chh;
   private final TaskAttemptListener tal;
   @SuppressWarnings("rawtypes")
@@ -38,10 +42,10 @@ public class AMContainerMap extends Abst
     containerMap.get(containerId).handle(event);
   }
 
-  public void addNewContainer(Container container) {
+  public void addContainerIfNew(Container container) {
     AMContainer amc = new AMContainerImpl(container, chh, tal, eventHandler,
         context);
-    containerMap.put(container.getId(), amc);
+    containerMap.putIfAbsent(container.getId(), amc);
   }
   
   public AMContainer get(ContainerId containerId) {



Mime
View raw message