hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1611531 [2/3] - in /hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hado...
Date Fri, 18 Jul 2014 02:21:28 GMT
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jul 18 02:21:21 2014
@@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements
           appAttempt.masterService
               .registerAppAttempt(appAttempt.applicationAttemptId);
 
-          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false, false));
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
         }
 
         /*

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Fri Jul 18 02:21:21 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -73,5 +76,7 @@ public interface RMContainer extends Eve
   ContainerReport createContainerReport();
   
   boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Fri Jul 18 02:21:21 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -158,6 +160,7 @@ public class RMContainerImpl implements 
   private long finishTime;
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -180,7 +183,8 @@ public class RMContainerImpl implements 
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
-    
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -311,6 +315,25 @@ public class RMContainerImpl implements 
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -432,6 +455,9 @@ public class RMContainerImpl implements 
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainerId());
 

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Fri Jul 18 02:21:21 2014
@@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNod
   private String nodeManagerVersion;
 
   /* set of containers that have just launched */
-  private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
-    new HashMap<ContainerId, ContainerStatus>();
+  private final Set<ContainerId> launchedContainers =
+    new HashSet<ContainerId>();
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -476,6 +475,13 @@ public class RMNodeImpl implements RMNod
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
         containers = startEvent.getNMContainerStatuses();
+        if (containers != null && !containers.isEmpty()) {
+          for (NMContainerStatus container : containers) {
+            if (container.getContainerState() == ContainerState.RUNNING) {
+              rmNode.launchedContainers.add(container.getContainerId());
+            }
+          }
+        }
       }
       
       if (null != startEvent.getRunningApplications()) {
@@ -664,14 +670,14 @@ public class RMNodeImpl implements RMNod
 
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+          if (!rmNode.launchedContainers.contains(containerId)) {
             // Just launched container. RM knows about it the first time.
-            rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+            rmNode.launchedContainers.add(containerId);
             newlyLaunchedContainers.add(remoteContainer);
           }
         } else {
           // A finished container
-          rmNode.justLaunchedContainers.remove(containerId);
+          rmNode.launchedContainers.remove(containerId);
           completedContainers.add(remoteContainer);
         }
       }
@@ -748,4 +754,10 @@ public class RMNodeImpl implements RMNod
   public int getQueueSize() {
     return nodeUpdateQueue.size();
   }
+
+  // For test only.
+  @VisibleForTesting
+  public Set<ContainerId> getLaunchedContainers() {
+    return this.launchedContainers;
+  }
  }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Jul 18 02:21:21 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -275,6 +276,27 @@ public abstract class AbstractYarnSchedu
     return rmContainer;
   }
 
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jul 18 02:21:21 2014
@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
     metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
       false);
   }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jul 18 02:21:21 2014
@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jul 18 02:21:21 2014
@@ -521,7 +521,7 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
@@ -553,14 +553,20 @@ public class CapacityScheduler extends
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +584,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -905,7 +912,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -921,7 +929,7 @@ public class CapacityScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -1089,6 +1097,7 @@ public class CapacityScheduler extends
     if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
+    recoverResourceRequestForContainer(cont);
     completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
       cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
       RMContainerEventType.KILL);

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Jul 18 02:21:21 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Fri Jul 18 02:21:21 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Jul 18 02:21:21 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
-  private final boolean shouldNotifyAttemptAdded;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
   }
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
-    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
     return transferStateFromPreviousAttempt;
   }
 
-  public boolean getShouldNotifyAttemptAdded() {
-    return shouldNotifyAttemptAdded;
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
   }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Jul 18 02:21:21 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
+    
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
 
+    // Update resource requests related to "request" and store in RMContainer
+    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jul 18 02:21:21 2014
@@ -422,7 +422,7 @@ public class FairScheduler extends
     }
   }
   
-  private void warnOrKillContainer(RMContainer container) {
+  protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSSchedulerApp app = getSchedulerApp(appAttemptId);
     FSLeafQueue queue = app.getQueue();
@@ -440,6 +440,7 @@ public class FairScheduler extends
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
@@ -565,7 +566,7 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -602,8 +603,14 @@ public class FairScheduler extends
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -612,7 +619,7 @@ public class FairScheduler extends
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -641,14 +648,15 @@ public class FairScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
 
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -1135,7 +1143,8 @@ public class FairScheduler extends
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1153,7 +1162,7 @@ public class FairScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jul 18 02:21:21 2014
@@ -356,22 +356,28 @@ public class FifoScheduler extends
 
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
+      String queue, String user, boolean isAppRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
-          boolean shouldNotifyAttemptAdded) {
+          boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -389,14 +395,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -772,7 +779,8 @@ public class FifoScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -788,7 +796,7 @@ public class FifoScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Fri Jul 18 02:21:21 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
+
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Fri Jul 18 02:21:21 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.AccessControlException;
 import java.nio.ByteBuffer;
+import java.security.Principal;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,6 +37,7 @@ import java.util.concurrent.ConcurrentMa
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -57,6 +59,8 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -67,6 +71,13 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -85,6 +96,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@@ -109,6 +121,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
@@ -118,6 +131,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -139,6 +153,9 @@ public class RMWebServices {
   private final Configuration conf;
   private @Context HttpServletResponse response;
 
+  public final static String DELEGATION_TOKEN_HEADER =
+      "Hadoop-YARN-RM-Delegation-Token";
+
   @Inject
   public RMWebServices(final ResourceManager rm, Configuration conf) {
     this.rm = rm;
@@ -147,11 +164,7 @@ public class RMWebServices {
 
   protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
     // Check for the authorization.
-    String remoteUser = hsr.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI != null
         && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI,
               ApplicationAccessType.VIEW_APP, app.getUser(),
@@ -626,7 +639,7 @@ public class RMWebServices {
   public AppState getAppState(@Context HttpServletRequest hsr,
       @PathParam("appid") String appId) throws AuthorizationException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     String userName = "";
     if (callerUGI != null) {
       userName = callerUGI.getUserName();
@@ -661,7 +674,7 @@ public class RMWebServices {
       IOException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       String msg = "Unable to obtain user name, user not authenticated";
       throw new AuthorizationException(msg);
@@ -771,9 +784,14 @@ public class RMWebServices {
   }
 
   private UserGroupInformation getCallerUserGroupInformation(
-      HttpServletRequest hsr) {
+      HttpServletRequest hsr, boolean usePrincipal) {
 
     String remoteUser = hsr.getRemoteUser();
+    if (usePrincipal) {
+      Principal princ = hsr.getUserPrincipal();
+      remoteUser = princ == null ? null : princ.getName();
+    }
+
     UserGroupInformation callerUGI = null;
     if (remoteUser != null) {
       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
@@ -799,7 +817,7 @@ public class RMWebServices {
   public Response createNewApplication(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -835,7 +853,7 @@ public class RMWebServices {
       IOException, InterruptedException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -887,8 +905,8 @@ public class RMWebServices {
       throw new YarnRuntimeException(msg, e);
     }
     NewApplication appId =
-        new NewApplication(resp.getApplicationId().toString(), new ResourceInfo(
-          resp.getMaximumResourceCapability()));
+        new NewApplication(resp.getApplicationId().toString(),
+          new ResourceInfo(resp.getMaximumResourceCapability()));
     return appId;
   }
 
@@ -962,7 +980,8 @@ public class RMWebServices {
    * @throws IOException
    */
   protected ContainerLaunchContext createContainerLaunchContext(
-      ApplicationSubmissionContextInfo newApp) throws BadRequestException, IOException {
+      ApplicationSubmissionContextInfo newApp) throws BadRequestException,
+      IOException {
 
     // create container launch context
 
@@ -1033,4 +1052,238 @@ public class RMWebServices {
     }
     return ret;
   }
+
+  private UserGroupInformation createKerberosUserGroupInformation(
+      HttpServletRequest hsr) throws AuthorizationException, YarnException {
+
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+    if (callerUGI == null) {
+      String msg = "Unable to obtain user name, user not authenticated";
+      throw new AuthorizationException(msg);
+    }
+
+    String authType = hsr.getAuthType();
+    if (!KerberosAuthenticationHandler.TYPE.equals(authType)) {
+      String msg =
+          "Delegation token operations can only be carried out on a "
+              + "Kerberos authenticated channel";
+      throw new YarnException(msg);
+    }
+
+    callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+    return callerUGI;
+  }
+
+  @POST
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response postDelegationToken(DelegationToken tokenData,
+      @Context HttpServletRequest hsr) throws AuthorizationException,
+      IOException, InterruptedException, Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+    return createDelegationToken(tokenData, hsr, callerUGI);
+  }
+
+  @POST
+  @Path("/delegation-token/expiration")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response
+      postDelegationTokenExpiration(@Context HttpServletRequest hsr)
+          throws AuthorizationException, IOException, InterruptedException,
+          Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    DelegationToken requestToken = new DelegationToken();
+    requestToken.setToken(extractToken(hsr).encodeToUrlString());
+    return renewDelegationToken(requestToken, hsr, callerUGI);
+  }
+
+  private Response createDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    final String renewer = tokenData.getRenewer();
+    GetDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
+              @Override
+              public GetDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                GetDelegationTokenRequest createReq =
+                    GetDelegationTokenRequest.newInstance(renewer);
+                return rm.getClientRMService().getDelegationToken(createReq);
+              }
+            });
+    } catch (Exception e) {
+      LOG.info("Create delegation token request failed", e);
+      throw e;
+    }
+
+    Token<RMDelegationTokenIdentifier> tk =
+        new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken()
+          .getIdentifier().array(), resp.getRMDelegationToken().getPassword()
+          .array(), new Text(resp.getRMDelegationToken().getKind()), new Text(
+          resp.getRMDelegationToken().getService()));
+    RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
+    long currentExpiration =
+        rm.getRMContext().getRMDelegationTokenSecretManager()
+          .getRenewDate(identifier);
+    DelegationToken respToken =
+        new DelegationToken(tk.encodeToUrlString(), renewer, identifier
+          .getOwner().toString(), tk.getKind().toString(), currentExpiration,
+          identifier.getMaxDate());
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  private Response renewDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    Token<RMDelegationTokenIdentifier> token =
+        extractToken(tokenData.getToken());
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final RenewDelegationTokenRequest req =
+        RenewDelegationTokenRequest.newInstance(dToken);
+
+    RenewDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
+              @Override
+              public RenewDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                return rm.getClientRMService().renewDelegationToken(req);
+              }
+            });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+    long renewTime = resp.getNextExpirationTime();
+
+    DelegationToken respToken = new DelegationToken();
+    respToken.setNextExpirationTime(renewTime);
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  // For cancelling tokens, the encoded token is passed as a header
+  // There are two reasons for this -
+  // 1. Passing a request body as part of a DELETE request is not
+  // allowed by Jetty
+  // 2. Passing the encoded token as part of the url is not ideal
+  // since urls tend to get logged and anyone with access to
+  // the logs can extract tokens which are meant to be secret
+  @DELETE
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response cancelDelegationToken(@Context HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final CancelDelegationTokenRequest req =
+        CancelDelegationTokenRequest.newInstance(dToken);
+
+    try {
+      callerUGI
+        .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
+          @Override
+          public CancelDelegationTokenResponse run() throws IOException,
+              YarnException {
+            return rm.getClientRMService().cancelDelegationToken(req);
+          }
+        });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+
+    return Response.status(Status.OK).build();
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(
+      HttpServletRequest request) {
+    String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
+    if (encodedToken == null) {
+      String msg =
+          "Header '" + DELEGATION_TOKEN_HEADER
+              + "' containing encoded token not found";
+      throw new BadRequestException(msg);
+    }
+    return extractToken(encodedToken);
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>();
+    try {
+      token.decodeFromUrlString(encodedToken);
+    } catch (Exception ie) {
+      String msg = "Could not decode encoded token";
+      throw new BadRequestException(msg);
+    }
+    return token;
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Jul 18 02:21:21 2014
@@ -228,7 +228,7 @@ public class TestFifoScheduler {
     scheduler.handle(new NodeAddedSchedulerEvent(node));
 
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    scheduler.addApplication(appId, "queue1", "user1");
+    scheduler.addApplication(appId, "queue1", "user1", false);
 
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
     try {
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false, true);
+    scheduler.addApplicationAttempt(attId, false, false);
 
     rm.stop();
   }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Fri Jul 18 02:21:21 2014
@@ -29,11 +29,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -164,6 +167,11 @@ public class TestWorkPreservingRMRestart
 
     // Wait for RM to settle down on recovering containers;
     waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+    Set<ContainerId> launchedContainers =
+        ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
+          .getLaunchedContainers();
+    assertTrue(launchedContainers.contains(amContainer.getContainerId()));
+    assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
 
     // check RMContainers are re-recreated and the container state is correct.
     rm2.waitForState(nm1, amContainer.getContainerId(),
@@ -602,6 +610,36 @@ public class TestWorkPreservingRMRestart
         attempt0.getMasterContainer().getId()).isAMContainer());
   }
 
+  @Test (timeout = 20000)
+  public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
+    // start RM
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // scheduler app/attempt is immediately available after RM is re-started.
+    Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
+      am0.getApplicationAttemptId()));
+
+    // getTransferredContainers should not throw NPE.
+    ((AbstractYarnScheduler) rm2.getResourceScheduler())
+      .getTransferredContainers(am0.getApplicationAttemptId());
+
+    List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
+    nm1.registerNode(containers, null);
+    waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+  }
 
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Fri Jul 18 02:21:21 2014
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
     verify(writer, never()).containerFinished(any(RMContainer.class));
   }
+  
+  @Test
+  public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+    Configuration conf = new Configuration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // Verify whether list of ResourceRequest is present in RMContainer
+    // while moving to ALLOCATED state
+    Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+
+    // Allocate container
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
+        .getAllocatedContainers();
+    rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+    // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+    // be empty
+    Assert.assertNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Jul 18 02:21:21 2014
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -947,4 +951,67 @@ public class TestCapacityScheduler {
 
     rm1.stop();
   }
+  
+  @Test(timeout = 30000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId1 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+    RMContainer rmContainer = cs.getRMContainer(containerId1);
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+    for (ResourceRequest request : requests) {
+      // Skip the OffRack and RackLocal resource requests.
+      if (request.getResourceName().equals(node.getRackName())
+          || request.getResourceName().equals(ResourceRequest.ANY)) {
+        continue;
+      }
+
+      // Already the node local resource request is cleared from RM after
+      // allocation.
+      Assert.assertNull(app.getResourceRequest(request.getPriority(),
+          request.getResourceName()));
+    }
+
+    // Call killContainer to preempt the container
+    cs.killContainer(rmContainer);
+
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      // Resource request must have added back in RM after preempt event
+      // handling.
+      Assert.assertEquals(
+          1,
+          app.getResourceRequest(request.getPriority(),
+              request.getResourceName()).getNumContainers());
+    }
+
+    // New container will be allocated and will move to ALLOCATED state
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // allocate container
+    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1611531&r1=1611530&r2=1611531&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Fri Jul 18 02:21:21 2014
@@ -147,11 +147,11 @@ public class FairSchedulerTestBase {
       int memory, int vcores, String queueId, String userId, int numContainers,
       int priority) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false, true);
+      scheduler.addApplicationAttempt(id, false, false);
     }
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
@@ -167,6 +167,27 @@ public class FairSchedulerTestBase {
         .put(id.getApplicationId(), rmApp);
     return id;
   }
+  
+  protected ApplicationAttemptId createSchedulingRequest(String queueId,
+      String userId, List<ResourceRequest> ask) {
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+        this.ATTEMPT_ID++);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+    // This conditional is for testAclSubmitApplication where app is rejected
+    // and no app is added.
+    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+      scheduler.addApplicationAttempt(id, false, false);
+    }
+    scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
+    RMApp rmApp = mock(RMApp.class);
+    RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
+    when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
+    when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
+        new RMAppAttemptMetrics(id));
+    resourceManager.getRMContext().getRMApps()
+        .put(id.getApplicationId(), rmApp);
+    return id;
+  }
 
   protected void createSchedulingRequestExistingApplication(
        int memory, int priority, ApplicationAttemptId attId) {



Mime
View raw message