hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1619012 [3/7] - in /hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Date Tue, 19 Aug 2014 23:50:52 GMT
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,6 +39,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -58,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -70,9 +73,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -143,19 +149,26 @@ public class RMContainerAllocator extend
   private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
-  private int mapResourceReqt;//memory
-  private int reduceResourceReqt;//memory
+  private int mapResourceRequest;//memory
+  private int reduceResourceRequest;//memory
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
+  /**
+   * after this threshold, if the container request is not allocated, it is
+   * considered delayed.
+   */
+  private long allocationDelayThresholdMs = 0;
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
+  private Clock clock;
 
   private final AMPreemptionPolicy preemptionPolicy;
 
-  BlockingQueue<ContainerAllocatorEvent> eventQueue
+  @VisibleForTesting
+  protected BlockingQueue<ContainerAllocatorEvent> eventQueue
     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
 
   private ScheduleStats scheduleStats = new ScheduleStats();
@@ -165,6 +178,7 @@ public class RMContainerAllocator extend
     super(clientService, context);
     this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
+    this.clock = context.getClock();
   }
 
   @Override
@@ -179,6 +193,9 @@ public class RMContainerAllocator extend
     maxReducePreemptionLimit = conf.getFloat(
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+    allocationDelayThresholdMs = conf.getInt(
+        MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+        MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
     RackResolver.init(conf);
     retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -245,7 +262,7 @@ public class RMContainerAllocator extend
           getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceReqt, reduceResourceReqt,
+          mapResourceRequest, reduceResourceRequest,
           pendingReduces.size(), 
           maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
@@ -267,6 +284,18 @@ public class RMContainerAllocator extend
     scheduleStats.log("Final Stats: ");
   }
 
+  @Private
+  @VisibleForTesting
+  AssignedRequests getAssignedRequests() {
+    return assignedRequests;
+  }
+
+  @Private
+  @VisibleForTesting
+  ScheduledRequests getScheduledRequests() {
+    return scheduledRequests;
+  }
+
   public boolean getIsReduceStarted() {
     return reduceStarted;
   }
@@ -302,16 +331,16 @@ public class RMContainerAllocator extend
       int supportedMaxContainerCapability =
           getMaxContainerCapability().getMemory();
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceReqt == 0) {
-          mapResourceReqt = reqEvent.getCapability().getMemory();
+        if (mapResourceRequest == 0) {
+          mapResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
-              mapResourceReqt)));
-          LOG.info("mapResourceReqt:"+mapResourceReqt);
-          if (mapResourceReqt > supportedMaxContainerCapability) {
+                  mapResourceRequest)));
+          LOG.info("mapResourceRequest:"+ mapResourceRequest);
+          if (mapResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "MAP capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. mapResourceReqt: " + 
-            mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
+            "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
+                mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
                 jobId, diagMsg));
@@ -319,20 +348,20 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(mapResourceReqt);
+        reqEvent.getCapability().setMemory(mapResourceRequest);
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
-        if (reduceResourceReqt == 0) {
-          reduceResourceReqt = reqEvent.getCapability().getMemory();
+        if (reduceResourceRequest == 0) {
+          reduceResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
-              reduceResourceReqt)));
-          LOG.info("reduceResourceReqt:"+reduceResourceReqt);
-          if (reduceResourceReqt > supportedMaxContainerCapability) {
+                  reduceResourceRequest)));
+          LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
+          if (reduceResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "REDUCE capability required is more than the " +
             		"supported max container capability in the cluster. Killing the " +
-            		"Job. reduceResourceReqt: " + reduceResourceReqt +
+            		"Job. reduceResourceRequest: " + reduceResourceRequest +
             		" maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
@@ -341,7 +370,7 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(reduceResourceReqt);
+        reqEvent.getCapability().setMemory(reduceResourceRequest);
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -365,6 +394,7 @@ public class RMContainerAllocator extend
           removed = true;
           assignedRequests.remove(aId);
           containersReleased++;
+          pendingRelease.add(containerId);
           release(containerId);
         }
       }
@@ -393,8 +423,22 @@ public class RMContainerAllocator extend
     return host;
   }
 
-  private void preemptReducesIfNeeded() {
-    if (reduceResourceReqt == 0) {
+  @Private
+  @VisibleForTesting
+  synchronized void setReduceResourceRequest(int mem) {
+    this.reduceResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  synchronized void setMapResourceRequest(int mem) {
+    this.mapResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  void preemptReducesIfNeeded() {
+    if (reduceResourceRequest == 0) {
       return; //no reduces
     }
     //check if reduces have taken over the whole cluster and there are 
@@ -402,9 +446,9 @@ public class RMContainerAllocator extend
     if (scheduledRequests.maps.size() > 0) {
       int memLimit = getMemLimit();
       int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
-          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
       //availableMemForMap must be sufficient to run atleast 1 map
-      if (availableMemForMap < mapResourceReqt) {
+      if (availableMemForMap < mapResourceRequest) {
         //to make sure new containers are given to maps and not reduces
         //ramp down all scheduled reduces if any
         //(since reduces are scheduled at higher priority than maps)
@@ -413,22 +457,40 @@ public class RMContainerAllocator extend
           pendingReduces.add(req);
         }
         scheduledRequests.reduces.clear();
-        
-        //preempt for making space for at least one map
-        int premeptionLimit = Math.max(mapResourceReqt, 
-            (int) (maxReducePreemptionLimit * memLimit));
-        
-        int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
-            premeptionLimit);
-        
-        int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
-        toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-        
-        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
-        assignedRequests.preemptReduce(toPreempt);
+
+        //do further checking to find the number of map requests that were
+        //hanging around for a while
+        int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
+        if (hangingMapRequests > 0) {
+          //preempt for making space for at least one map
+          int premeptionLimit = Math.max(mapResourceRequest,
+              (int) (maxReducePreemptionLimit * memLimit));
+
+          int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
+              premeptionLimit);
+
+          int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
+          toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+          LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+          assignedRequests.preemptReduce(toPreempt);
+        }
       }
     }
   }
+
+  private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+    if (allocationDelayThresholdMs <= 0)
+      return requestMap.size();
+    int hangingRequests = 0;
+    long currTime = clock.getTime();
+    for (ContainerRequest request: requestMap.values()) {
+      long delay = currTime - request.requestTimeMs;
+      if (delay > allocationDelayThresholdMs)
+        hangingRequests++;
+    }
+    return hangingRequests;
+  }
   
   @Private
   public void scheduleReduces(
@@ -585,6 +647,15 @@ public class RMContainerAllocator extend
     if (response.getAMCommand() != null) {
       switch(response.getAMCommand()) {
       case AM_RESYNC:
+          LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+              + " hence resyncing.");
+          lastResponseID = 0;
+
+          // Registering to allow RM to discover an active AM for this
+          // application
+          register();
+          addOutstandingRequestOnResync();
+          break;
       case AM_SHUTDOWN:
         // This can happen if the RM has been restarted. If it is in that state,
         // this application must clean itself up.
@@ -608,7 +679,12 @@ public class RMContainerAllocator extend
             nmToken.getToken());
       }
     }
-    
+
+    // Setting AMRMToken
+    if (response.getAMRMToken() != null) {
+      updateAMRMToken(response.getAMRMToken());
+    }
+
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
 
     // propagate preemption requests
@@ -644,6 +720,7 @@ public class RMContainerAllocator extend
         LOG.error("Container complete event for unknown container id "
             + cont.getContainerId());
       } else {
+        pendingRelease.remove(cont.getContainerId());
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
@@ -659,11 +736,24 @@ public class RMContainerAllocator extend
     }
     return newContainers;
   }
-  
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
+          .getIdentifier().array(), token.getPassword().array(), new Text(
+          token.getKind()), new Text(token.getService()));
+    UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      currentUGI = UserGroupInformation.getLoginUser();
+    }
+    currentUGI.addToken(amrmToken);
+  }
+
   @VisibleForTesting
   public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
       TaskAttemptId attemptID) {
-    if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
+    if (cont.getExitStatus() == ContainerExitStatus.ABORTED
+        || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
       // killed by framework
       return new TaskAttemptEvent(attemptID,
           TaskAttemptEventType.TA_KILL);
@@ -714,11 +804,13 @@ public class RMContainerAllocator extend
   @Private
   public int getMemLimit() {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
-       assignedRequests.reduces.size() * reduceResourceReqt;
+    return headRoom + assignedRequests.maps.size() * mapResourceRequest +
+       assignedRequests.reduces.size() * reduceResourceRequest;
   }
-  
-  private class ScheduledRequests {
+
+  @Private
+  @VisibleForTesting
+  class ScheduledRequests {
     
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
       new LinkedList<TaskAttemptId>();
@@ -728,7 +820,8 @@ public class RMContainerAllocator extend
       new HashMap<String, LinkedList<TaskAttemptId>>();
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
       new HashMap<String, LinkedList<TaskAttemptId>>();
-    private final Map<TaskAttemptId, ContainerRequest> maps = 
+    @VisibleForTesting
+    final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
     
     private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 
@@ -824,22 +917,22 @@ public class RMContainerAllocator extend
         int allocatedMemory = allocated.getResource().getMemory();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocatedMemory < mapResourceReqt
+          if (allocatedMemory < mapResourceRequest
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
-                + " container memory less than required " + mapResourceReqt
+                + " container memory less than required " + mapResourceRequest
                 + " or no pending map tasks - maps.isEmpty=" 
                 + maps.isEmpty()); 
             isAssignable = false; 
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocatedMemory < reduceResourceReqt
+          if (allocatedMemory < reduceResourceRequest
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "
-                + " container memory less than required " + reduceResourceReqt
+                + " container memory less than required " + reduceResourceRequest
                 + " or no pending reduce tasks - reduces.isEmpty=" 
                 + reduces.isEmpty()); 
             isAssignable = false;
@@ -931,6 +1024,7 @@ public class RMContainerAllocator extend
     
     private void containerNotAssigned(Container allocated) {
       containersReleased++;
+      pendingRelease.add(allocated.getId());
       release(allocated.getId());      
     }
     
@@ -1118,14 +1212,18 @@ public class RMContainerAllocator extend
     }
   }
 
-  private class AssignedRequests {
+  @Private
+  @VisibleForTesting
+  class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
     private final LinkedHashMap<TaskAttemptId, Container> maps = 
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final LinkedHashMap<TaskAttemptId, Container> reduces = 
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, Container> reduces =
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final Set<TaskAttemptId> preemptionWaitingReduces = 
+    @VisibleForTesting
+    final Set<TaskAttemptId> preemptionWaitingReduces =
       new HashSet<TaskAttemptId>();
     
     void add(Container container, TaskAttemptId tId) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Tue Aug 19 23:49:39 2014
@@ -29,8 +29,10 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -38,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,7 +59,7 @@ public abstract class RMContainerRequest
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
 
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource availableResources;
 
   private final RecordFactory recordFactory =
@@ -75,8 +78,11 @@ public abstract class RMContainerRequest
   // numContainers dont end up as duplicates
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
-  private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
-
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -96,6 +102,8 @@ public abstract class RMContainerRequest
     super(clientService, context);
   }
 
+  @Private
+  @VisibleForTesting
   static class ContainerRequest {
     final TaskAttemptId attemptID;
     final Resource capability;
@@ -103,20 +111,39 @@ public abstract class RMContainerRequest
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
-    
+    /**
+     * the time when this request object was formed; can be used to avoid
+     * aggressive preemption for recently placed requests
+     */
+    final long requestTimeMs;
+
     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
           event.getRacks(), priority);
     }
-    
+
+    public ContainerRequest(ContainerRequestEvent event, Priority priority,
+                            long requestTimeMs) {
+      this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+          event.getRacks(), priority, requestTimeMs);
+    }
+
+    public ContainerRequest(TaskAttemptId attemptID,
+                            Resource capability, String[] hosts, String[] racks,
+                            Priority priority) {
+      this(attemptID, capability, hosts, racks, priority,
+          System.currentTimeMillis());
+    }
+
     public ContainerRequest(TaskAttemptId attemptID,
-        Resource capability, String[] hosts, String[] racks, 
-        Priority priority) {
+        Resource capability, String[] hosts, String[] racks,
+        Priority priority, long requestTimeMs) {
       this.attemptID = attemptID;
       this.capability = capability;
       this.hosts = hosts;
       this.racks = racks;
       this.priority = priority;
+      this.requestTimeMs = requestTimeMs;
     }
     
     public String toString() {
@@ -163,6 +190,10 @@ public abstract class RMContainerRequest
     } catch (YarnException e) {
       throw new IOException(e);
     }
+
+    if (isResyncCommand(allocateResponse)) {
+      return allocateResponse;
+    }
     lastResponseID = allocateResponse.getResponseId();
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
@@ -191,6 +222,28 @@ public abstract class RMContainerRequest
     return allocateResponse;
   }
 
+  protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
+  protected void addOutstandingRequestOnResync() {
+    for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+        .values()) {
+      for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+        for (ResourceRequest request : capabalities.values()) {
+          addResourceRequestToAsk(request);
+        }
+      }
+    }
+    if (!ignoreBlacklisting.get()) {
+      blacklistAdditions.addAll(blacklistedNodes);
+    }
+    if (!pendingRelease.isEmpty()) {
+      release.addAll(pendingRelease);
+    }
+  }
+
   // May be incorrect if there's multiple NodeManagers running on a single host.
   // knownNodeCount is based on node managers, not hosts. blacklisting is
   // currently based on hosts.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Aug 19 23:49:39 2014
@@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -52,13 +54,18 @@ public class KillAMPreemptionPolicy impl
   public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
     // for both strict and negotiable preemption requests kill the
     // container
-    for (PreemptionContainer c :
-        preemptionRequests.getStrictContract().getContainers()) {
-      killContainer(ctxt, c);
+    StrictPreemptionContract strictContract = preemptionRequests
+        .getStrictContract();
+    if (strictContract != null) {
+      for (PreemptionContainer c : strictContract.getContainers()) {
+        killContainer(ctxt, c);
+      }
     }
-    for (PreemptionContainer c :
-         preemptionRequests.getContract().getContainers()) {
-       killContainer(ctxt, c);
+    PreemptionContract contract = preemptionRequests.getContract();
+    if (contract != null) {
+      for (PreemptionContainer c : contract.getContainers()) {
+        killContainer(ctxt, c);
+      }
     }
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Tue Aug 19 23:49:39 2014
@@ -85,7 +85,8 @@ public class TaskPage extends AppView {
         .append(ta.getId()).append("\",\"")
         .append(progress).append("\",\"")
         .append(ta.getState().toString()).append("\",\"")
-        .append(ta.getStatus()).append("\",\"")
+        .append(StringEscapeUtils.escapeJavaScript(
+              StringEscapeUtils.escapeHtml(ta.getStatus()))).append("\",\"")
 
         .append(nodeHttpAddr == null ? "N/A" :
           "<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.uti
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
 
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
@@ -102,7 +103,8 @@ public class TasksBlock extends HtmlBloc
       .append(join(pct, '%')).append("'> ").append("<div class='")
       .append(C_PROGRESSBAR_VALUE).append("' style='")
       .append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
-      .append(info.getStatus()).append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(
+              StringEscapeUtils.escapeHtml(info.getStatus()))).append("\",\"")
 
       .append(info.getState()).append("\",\"")
       .append(info.getStartTime()).append("\",\"")

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Aug 19 23:49:39 2014
@@ -63,6 +63,13 @@ public class TestTaskAttemptListenerImpl
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
+        RMHeartbeatHandler rmHeartbeatHandler, AMPreemptionPolicy policy) {
+
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
+    }
+
+    public MockTaskAttemptListenerImpl(AppContext context,
+        JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler,
         AMPreemptionPolicy policy) {
@@ -210,7 +217,7 @@ public class TestTaskAttemptListenerImpl
     when(appCtx.getEventHandler()).thenReturn(ea);
     CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
     policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
@@ -271,7 +278,7 @@ public class TestTaskAttemptListenerImpl
     when(appCtx.getEventHandler()).thenReturn(ea);
     CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
     policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
@@ -326,7 +333,7 @@ public class TestTaskAttemptListenerImpl
     when(appCtx.getEventHandler()).thenReturn(ea);
     CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
     policy.init(appCtx);
-    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+    TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
         appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,7 @@ import java.io.DataInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 
-import static junit.framework.Assert.*;
+import static org.junit.Assert.*;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -28,15 +28,21 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.never;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertFalse;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler 
 
   private static final Log LOG = LogFactory
       .getLog(TestJobHistoryEventHandler.class);
+  private static MiniDFSCluster dfsCluster = null;
+  private static String coreSitePath;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    coreSitePath = "." + File.separator + "target" + File.separator +
+            "test-classes" + File.separator + "core-site.xml";
+    Configuration conf = new HdfsConfiguration();
+    dfsCluster = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @AfterClass
+  public static void cleanUpClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @After
+  public void cleanTest() throws Exception {
+    new File(coreSitePath).delete();
+  }
 
   @Test (timeout=50000)
   public void testFirstFlushOnCompletionEvent() throws Exception {
@@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler 
     }
   }
 
+  @Test (timeout=50000)
+  public void testDefaultFsIsUsedForHistory() throws Exception {
+    // Create default configuration pointing to the minicluster
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            dfsCluster.getURI().toString());
+    FileOutputStream os = new FileOutputStream(coreSitePath);
+    conf.writeXml(os);
+    os.close();
+
+    // simulate execution under a non-default namenode
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            "file:///");
+
+    TestParams t = new TestParams();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0, false);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+          new Counters(), new Counters())));
+
+      // If we got here then event handler worked but we don't know with which
+      // file system. Now we check that history stuff was written to minicluster
+      FileSystem dfsFileSystem = dfsCluster.getFileSystem();
+      assertTrue("Minicluster contains some history files",
+          dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
+      FileSystem localFileSystem = LocalFileSystem.get(conf);
+      assertFalse("No history directory on non-default file system",
+          localFileSystem.exists(new Path(t.dfsWorkDir)));
+    } finally {
+      jheh.stop();
+    }
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler 
   private class TestParams {
     boolean isLastAMRetry;
     String workDir = setupTestWorkDir();
+    String dfsWorkDir = "/" + this.getClass().getCanonicalName();
     ApplicationId appId = ApplicationId.newInstance(200, 1);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);
@@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler 
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
+  private boolean mockHistoryProcessing = true;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
   }
 
+  public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
+    super(context, startCount);
+    this.mockHistoryProcessing = mockHistoryProcessing;
+  }
+
   @Override
   protected void serviceStart() {
   }
@@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHi
   @Override
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
-    this.eventWriter = mock(EventWriter.class);
+    if (mockHistoryProcessing) {
+      this.eventWriter = mock(EventWriter.class);
+    }
+    else {
+      this.eventWriter = super.createEventWriter(historyFilePath);
+    }
     return this.eventWriter;
   }
 
@@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHi
   }
 
   @Override
-  protected void processDoneFiles(JobId jobId){
-    // do nothing
+  protected void processDoneFiles(JobId jobId) throws IOException {
+    if (!mockHistoryProcessing) {
+      super.processDoneFiles(jobId);
+    }
+    else {
+      // do nothing
+    }
   }
 }
 
@@ -501,4 +592,4 @@ class JHEventHandlerForSigtermTest exten
     this.lastEventHandled = event;
     this.eventsHandled++;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Aug 19 23:49:39 2014
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
 
 
 /**
@@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster {
       int maps, int reduces, boolean autoComplete, String testName,
       boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
       String assignedQueue) {
-    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
-        .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock,
+        System.currentTimeMillis());
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -573,7 +573,8 @@ public class MRApp extends MRAppMaster {
         Resource resource = Resource.newInstance(1234, 2);
         ContainerTokenIdentifier containerTokenIdentifier =
             new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
-              resource, System.currentTimeMillis() + 10000, 42, 42);
+            resource, System.currentTimeMillis() + 10000, 42, 42,
+            Priority.newInstance(0), 0);
         Token containerToken = newContainerToken(nodeId, "password".getBytes(),
               containerTokenIdentifier);
         Container container = Container.newInstance(cId, nodeId,
@@ -627,10 +628,18 @@ public class MRApp extends MRAppMaster {
           throws IOException {
         committer.abortJob(jobContext, state);
       }
+
+      @Override
+      public boolean isRecoverySupported(JobContext jobContext) throws IOException{
+        return committer.isRecoverySupported(jobContext);
+      }
+
+      @SuppressWarnings("deprecation")
       @Override
       public boolean isRecoverySupported() {
         return committer.isRecoverySupported();
       }
+
       @Override
       public void setupTask(TaskAttemptContext taskContext)
           throws IOException {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java Tue Aug 19 23:49:39 2014
@@ -143,4 +143,9 @@ public class MockAppContext implements A
     return true;
   }
 
+@Override
+  public String getNMHostname() {
+    // bogus - Not Required
+    return null;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java Tue Aug 19 23:49:39 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.Iterator;
 import java.util.List;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Tue Aug 19 23:49:39 2014
@@ -253,6 +253,12 @@ public class TestJobEndNotifier extends 
     HttpServer2 server = startHttpServer();
     MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
         this.getClass().getName(), true, 2, false));
+    // Currently, we will have isLastRetry always equals to false at beginning
+    // of MRAppMaster, except staging area exists or commit already started at 
+    // the beginning.
+    // Now manually set isLastRetry to true and this should reset to false when
+    // unregister failed.
+    app.isLastAMRetry = true;
     doNothing().when(app).sysexit();
     JobConf conf = new JobConf();
     conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
@@ -264,13 +270,13 @@ public class TestJobEndNotifier extends 
     app.waitForInternalState(job, JobStateInternal.REBOOT);
     // Now shutdown. User should see FAILED state.
     // Unregistration fails: isLastAMRetry is recalculated, this is
-    app.shutDownJob();
-    Assert.assertTrue(app.isLastAMRetry());
-    Assert.assertEquals(1, JobEndServlet.calledTimes);
-    Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
-        JobEndServlet.requestUri.getQuery());
-    Assert.assertEquals(JobState.FAILED.toString(),
-      JobEndServlet.foundJobState);
+    ///reboot will stop service internally, we don't need to shutdown twice
+    app.waitForServiceToStop(10000);
+    Assert.assertFalse(app.isLastAMRetry());
+    // Since it's not last retry, JobEndServlet didn't called
+    Assert.assertEquals(0, JobEndServlet.calledTimes);
+    Assert.assertNull(JobEndServlet.requestUri);
+    Assert.assertNull(JobEndServlet.foundJobState);
     server.stop();
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -214,6 +214,87 @@ public class TestKill {
     app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
   }
 
+  static class MyAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private TaskAttemptEventType attemptEventTypeToWait;
+    MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
+      super();
+      this.latch = latch;
+      this.attemptEventTypeToWait = attemptEventTypeToWait;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event instanceof TaskAttemptEvent) {
+        TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
+        TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
+        if (attemptEvent.getType() == this.attemptEventTypeToWait
+            && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+      super.dispatch(event);
+    }
+  }
+
+  // This is to test a race condition where JobEventType.JOB_KILL is generated
+  // right after TaskAttemptEventType.TA_DONE is generated.
+  // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
+  // and T_ATTEMPT_KILLED from the same attempt.
+  @Test
+  public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE);
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // The order in the dispatch event queue, from the oldest to the newest
+    // TA_DONE
+    // JOB_KILL
+    // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
+    // T_KILL ( from JOB_KILL's handling )
+    // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
+    // TA_KILL ( from T_KILL's handling )
+    // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
+    // T_ATTEMPT_KILLED ( from TA_KILL's handling )
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+        .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    //unblock
+    latch.countDown();
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
   @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Aug 19 23:49:39 2014
@@ -26,7 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -112,6 +112,15 @@ public class TestMRApp {
     //wait for first attempt to commit pending
     app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
 
+    //re-send the commit pending signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            attempt.getID(),
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+
+    //the task attempt should be still at COMMIT_PENDING
+    app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
+
     //send the done signal to the task
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.io.IOException;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Aug 19 23:49:39 2014
@@ -31,7 +31,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -118,7 +118,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+            System.currentTimeMillis());
     JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -147,8 +147,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -186,8 +185,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -225,8 +223,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -264,8 +261,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -285,8 +281,9 @@ public class TestMRAppMaster {
   @Test (timeout = 30000)
   public void testMRAppMasterMaxAppAttempts() throws IOException,
       InterruptedException {
-    int[] maxAppAttemtps = new int[] { 1, 2, 3 };
-    Boolean[] expectedBools = new Boolean[]{ true, true, false };
+    // No matter what's the maxAppAttempt or attempt id, the isLastRetry always
+    // equals to false
+    Boolean[] expectedBools = new Boolean[]{ false, false, false };
 
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
@@ -301,10 +298,10 @@ public class TestMRAppMaster {
     File stagingDir =
         new File(MRApps.getStagingAreaDir(conf, userName).toString());
     stagingDir.mkdirs();
-    for (int i = 0; i < maxAppAttemtps.length; ++i) {
+    for (int i = 0; i < expectedBools.length; ++i) {
       MRAppMasterTest appMaster =
           new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-              System.currentTimeMillis(), maxAppAttemtps[i], false, true);
+              System.currentTimeMillis(), false, true);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
           appMaster.isLastAMRetry());
@@ -399,7 +396,7 @@ public class TestMRAppMaster {
 
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-          System.currentTimeMillis(), 1, false, true);
+          System.currentTimeMillis(), false, true);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
 
     // Now validate the task credentials
@@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaste
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, int maxAppAttempts) {
+      long submitTime) {
     this(applicationAttemptId, containerId, host, port, httpPort,
-        submitTime, maxAppAttempts, true, true);
+        submitTime, true, true);
   }
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, int maxAppAttempts, boolean overrideInit,
+      long submitTime, boolean overrideInit,
       boolean overrideStart) {
-    super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
-        maxAppAttempts);
+    super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
     this.overrideInit = overrideInit;
     this.overrideStart = overrideStart;
     mockContainerAllocator = mock(ContainerAllocator.class);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,7 @@ import java.security.PrivilegedException
 import java.util.Iterator;
 import java.util.List;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Aug 19 23:49:39 2014
@@ -34,7 +34,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -425,6 +425,266 @@ public class TestRecovery {
     app.verifyCompleted();
   }
 
+  /**
+   * The class provides a custom implementation of output committer setupTask
+   * and isRecoverySupported methods, which determines if recovery supported
+   * based on config property.
+   */
+  public static class TestFileOutputCommitter extends
+      org.apache.hadoop.mapred.FileOutputCommitter {
+
+    @Override
+    public boolean isRecoverySupported(
+        org.apache.hadoop.mapred.JobContext jobContext) {
+      boolean isRecoverySupported = false;
+      if (jobContext != null && jobContext.getConfiguration() != null) {
+        isRecoverySupported = jobContext.getConfiguration().getBoolean(
+            "want.am.recovery", false);
+      }
+      return isRecoverySupported;
+    }
+  }
+
+  /**
+   * This test case primarily verifies if the recovery is controlled through config
+   * property. In this case, recover is turned ON. AM with 3 maps and 0 reduce.
+   * AM crashes after the first two tasks finishes and recovers completely and
+   * succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setClass("mapred.output.committer.class",
+        TestFileOutputCommitter.class,
+        org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean("want.am.recovery", true);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
+        .next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
+        .next();
+    TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator()
+        .next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setClass("mapred.output.committer.class",
+        TestFileOutputCommitter.class,
+        org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.setBoolean("want.am.recovery", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 3rd map task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(mapTask3.getAttempts().values().iterator()
+                .next().getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
+  /**
+   * This test case primarily verifies if the recovery is controlled through config
+   * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
+   * AM crashes after the first two tasks finishes and recovery fails and have
+   * to rerun fully in the second generation and succeeds.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception {
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
+          org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean("want.am.recovery", false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt =
+        mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt =
+        mapTask2.getAttempts().values().iterator().next();
+    TaskAttempt task3Attempt =
+        mapTask3.getAttempts().values().iterator().next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
+            org.apache.hadoop.mapred.OutputCommitter.class);
+    conf.setBoolean("want.am.recovery", false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will NOT  be recovered, need to send done from them
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to all 3 tasks map task
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next()
+          .getID(), TaskAttemptEventType.TA_DONE));
+    app
+    .getContext()
+    .getEventHandler()
+    .handle(
+      new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next()
+        .getID(), TaskAttemptEventType.TA_DONE));
+
+    app
+    .getContext()
+    .getEventHandler()
+    .handle(
+      new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+        .getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   public void testMultipleCrashes() throws Exception {
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Aug 19 23:49:39 2014
@@ -879,5 +879,10 @@ public class TestRuntimeEstimators {
       return true;
     }
 
+    @Override
+    public String getNMHostname() {
+      // bogus - Not Required
+      return null;
+    }
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
@@ -28,9 +29,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Assert;
 import org.junit.Test;
 
 
 /**
  * Make sure that the job staging directory clean up happens.
  */
- public class TestStagingCleanup extends TestCase {
+ public class TestStagingCleanup {
    
    private Configuration conf = new Configuration();
    private FileSystem fs;
@@ -81,7 +80,7 @@ import org.junit.Test;
    public void testDeletionofStagingOnUnregistrationFailure()
        throws IOException {
      testDeletionofStagingOnUnregistrationFailure(2, false);
-     testDeletionofStagingOnUnregistrationFailure(1, true);
+     testDeletionofStagingOnUnregistrationFailure(1, false);
    }
 
    @SuppressWarnings("resource")
@@ -104,7 +103,7 @@ import org.junit.Test;
      appMaster.init(conf);
      appMaster.start();
      appMaster.shutDownJob();
-     ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
+     ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry();
      if (shouldHaveDeleted) {
        Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
        verify(fs).delete(stagingJobPath, true);
@@ -164,7 +163,11 @@ import org.junit.Test;
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
 
-   @Test (timeout = 30000)
+   // FIXME:
+   // Disabled this test because currently, when job state=REBOOT at shutdown 
+   // when lastRetry = true in RM view, cleanup will not do. 
+   // This will be supported after YARN-2261 completed
+//   @Test (timeout = 30000)
    public void testDeletionofStagingOnReboot() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
@@ -202,7 +205,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
      appMaster.init(conf);
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -210,8 +213,12 @@ import org.junit.Test;
      hook.run();
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
-   
-   @Test (timeout = 30000)
+
+  // FIXME:
+  // Disabled this test because currently, when shutdown hook triggered at
+  // lastRetry in RM view, cleanup will not do. This should be supported after
+  // YARN-2261 completed
+//   @Test (timeout = 30000)
    public void testDeletionofStagingOnKillLastTry() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
@@ -226,7 +233,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry
      appMaster.init(conf);
      assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry());
      //simulate the process being killed
@@ -245,10 +252,10 @@ import org.junit.Test;
      boolean crushUnregistration = false;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
-         ContainerAllocator allocator, int maxAppAttempts) {
+         ContainerAllocator allocator) {
        super(applicationAttemptId, ContainerId.newInstance(
            applicationAttemptId, 1), "testhost", 2222, 3333,
-           System.currentTimeMillis(), maxAppAttempts);
+           System.currentTimeMillis());
        this.allocator = allocator;
        this.successfullyUnregistered.set(true);
      }
@@ -256,7 +263,7 @@ import org.junit.Test;
      public TestMRApp(ApplicationAttemptId applicationAttemptId,
          ContainerAllocator allocator, JobStateInternal jobStateInternal,
              int maxAppAttempts) {
-       this(applicationAttemptId, allocator, maxAppAttempts);
+       this(applicationAttemptId, allocator);
        this.jobStateInternal = jobStateInternal;
      }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;



Mime
View raw message