hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1613514 [3/5] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/appl...
Date Fri, 25 Jul 2014 20:33:29 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Fri Jul 25 20:33:09 2014
@@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNod
   private String nodeManagerVersion;
 
   /* set of containers that have just launched */
-  private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
-    new HashMap<ContainerId, ContainerStatus>();
+  private final Set<ContainerId> launchedContainers =
+    new HashSet<ContainerId>();
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -476,6 +475,13 @@ public class RMNodeImpl implements RMNod
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
         containers = startEvent.getNMContainerStatuses();
+        if (containers != null && !containers.isEmpty()) {
+          for (NMContainerStatus container : containers) {
+            if (container.getContainerState() == ContainerState.RUNNING) {
+              rmNode.launchedContainers.add(container.getContainerId());
+            }
+          }
+        }
       }
       
       if (null != startEvent.getRunningApplications()) {
@@ -664,14 +670,14 @@ public class RMNodeImpl implements RMNod
 
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+          if (!rmNode.launchedContainers.contains(containerId)) {
             // Just launched container. RM knows about it the first time.
-            rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+            rmNode.launchedContainers.add(containerId);
             newlyLaunchedContainers.add(remoteContainer);
           }
         } else {
           // A finished container
-          rmNode.justLaunchedContainers.remove(containerId);
+          rmNode.launchedContainers.remove(containerId);
           completedContainers.add(remoteContainer);
         }
       }
@@ -748,4 +754,10 @@ public class RMNodeImpl implements RMNod
   public int getQueueSize() {
     return nodeUpdateQueue.size();
   }
+
+  // For test only.
+  @VisibleForTesting
+  public Set<ContainerId> getLaunchedContainers() {
+    return this.launchedContainers;
+  }
  }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Jul 25 20:33:09 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -122,6 +123,23 @@ public abstract class AbstractYarnSchedu
     return maximumAllocation;
   }
 
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
         applications.get(applicationAttemptId.getApplicationId());
@@ -275,6 +293,27 @@ public abstract class AbstractYarnSchedu
     return rmContainer;
   }
 
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jul 25 20:33:09 2014
@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
     metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
       false);
   }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jul 25 20:33:09 2014
@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jul 25 20:33:09 2014
@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -521,7 +520,7 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
@@ -553,14 +552,20 @@ public class CapacityScheduler extends
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +583,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -859,21 +865,6 @@ public class CapacityScheduler extends
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {
@@ -905,7 +896,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -921,7 +913,7 @@ public class CapacityScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -1089,6 +1081,7 @@ public class CapacityScheduler extends
     if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
+    recoverResourceRequestForContainer(cont);
     completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
       cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
       RMContainerEventType.KILL);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Jul 25 20:33:09 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Fri Jul 25 20:33:09 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Jul 25 20:33:09 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
-  private final boolean shouldNotifyAttemptAdded;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
   }
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
-    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
     return transferStateFromPreviousAttempt;
   }
 
-  public boolean getShouldNotifyAttemptAdded() {
-    return shouldNotifyAttemptAdded;
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
   }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jul 25 20:33:09 2014
@@ -224,16 +224,17 @@ public class FSLeafQueue extends FSQueue
   @Override
   public RMContainer preemptContainer() {
     RMContainer toBePreempted = null;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Queue " + getName() + " is going to preempt a container " +
-          "from its applications.");
-    }
 
     // If this queue is not over its fair share, reject
     if (!preemptContainerPreCheck()) {
       return toBePreempted;
     }
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queue " + getName() + " is going to preempt a container " +
+          "from its applications.");
+    }
+
     // Choose the app that is most over fair share
     Comparator<Schedulable> comparator = policy.getComparator();
     AppSchedulable candidateSched = null;
@@ -328,4 +329,14 @@ public class FSLeafQueue extends FSQueue
       SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
     // TODO Auto-generated method stub
   }
+
+  /**
+   * Helper method to check if the queue should preempt containers
+   *
+   * @return true if check passes (can preempt) or false otherwise
+   */
+  private boolean preemptContainerPreCheck() {
+    return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
+        getFairShare());
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Jul 25 20:33:09 2014
@@ -164,11 +164,6 @@ public class FSParentQueue extends FSQue
   public RMContainer preemptContainer() {
     RMContainer toBePreempted = null;
 
-    // If this queue is not over its fair share, reject
-    if (!preemptContainerPreCheck()) {
-      return toBePreempted;
-    }
-
     // Find the childQueue which is most over fair share
     FSQueue candidateQueue = null;
     Comparator<Schedulable> comparator = policy.getComparator();

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Jul 25 20:33:09 2014
@@ -187,17 +187,4 @@ public abstract class FSQueue extends Sc
     }
     return true;
   }
-
-  /**
-   * Helper method to check if the queue should preempt containers
-   *
-   * @return true if check passes (can preempt) or false otherwise
-   */
-  protected boolean preemptContainerPreCheck() {
-    if (this == scheduler.getQueueManager().getRootQueue()) {
-      return true;
-    }
-    return parent.getPolicy()
-        .checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
-  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Jul 25 20:33:09 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
+    
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
 
+    // Update resource requests related to "request" and store in RMContainer
+    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jul 25 20:33:09 2014
@@ -135,7 +135,9 @@ public class FairScheduler extends
   public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
 
   // How often fair shares are re-calculated (ms)
-  protected long UPDATE_INTERVAL = 500;
+  protected long updateInterval;
+  private final int UPDATE_DEBUG_FREQUENCY = 5;
+  private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
 
   private Thread updateThread;
   private Thread schedulingThread;
@@ -242,13 +244,13 @@ public class FairScheduler extends
 
   /**
    * A runnable which calls {@link FairScheduler#update()} every
-   * <code>UPDATE_INTERVAL</code> milliseconds.
+   * <code>updateInterval</code> milliseconds.
    */
   private class UpdateThread implements Runnable {
     public void run() {
       while (true) {
         try {
-          Thread.sleep(UPDATE_INTERVAL);
+          Thread.sleep(updateInterval);
           update();
           preemptTasksIfNecessary();
         } catch (Exception e) {
@@ -275,6 +277,18 @@ public class FairScheduler extends
     // Recursively compute fair shares for all queues
     // and update metrics
     rootQueue.recomputeShares();
+
+    if (LOG.isDebugEnabled()) {
+      if (--updatesToSkipForDebug < 0) {
+        updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+        LOG.debug("Cluster Capacity: " + clusterResource +
+            "  Allocations: " + rootMetrics.getAllocatedResources() +
+            "  Availability: " + Resource.newInstance(
+            rootMetrics.getAvailableMB(),
+            rootMetrics.getAvailableVirtualCores()) +
+            "  Demand: " + rootQueue.getDemand());
+      }
+    }
   }
 
   /**
@@ -408,7 +422,7 @@ public class FairScheduler extends
     }
   }
   
-  private void warnOrKillContainer(RMContainer container) {
+  protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSSchedulerApp app = getSchedulerApp(appAttemptId);
     FSLeafQueue queue = app.getQueue();
@@ -426,6 +440,7 @@ public class FairScheduler extends
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
@@ -551,7 +566,7 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -588,8 +603,14 @@ public class FairScheduler extends
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -598,7 +619,7 @@ public class FairScheduler extends
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -627,14 +648,15 @@ public class FairScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
 
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -879,14 +901,14 @@ public class FairScheduler extends
         // Update application requests
         application.updateResourceRequests(ask);
 
-        LOG.debug("allocate: post-update");
         application.showRequests();
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate:" +
+        LOG.debug("allocate: post-update" +
             " applicationAttemptId=" + appAttemptId +
-            " #ask=" + ask.size());
+            " #ask=" + ask.size() +
+            " reservation= " + application.getCurrentReservation());
 
         LOG.debug("Preempting " + application.getPreemptionContainers().size()
             + " container(s)");
@@ -907,22 +929,6 @@ public class FairScheduler extends
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
    * Process a heartbeat update from a node.
    */
   private synchronized void nodeUpdate(RMNode nm) {
@@ -964,37 +970,27 @@ public class FairScheduler extends
     }
   }
 
-  private void continuousScheduling() {
-    while (true) {
-      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-      // Sort the nodes by space available on them, so that we offer
-      // containers on emptier nodes first, facilitating an even spread. This
-      // requires holding the scheduler lock, so that the space available on a
-      // node doesn't change during the sort.
-      synchronized (this) {
-        Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-      }
-
-      // iterate all nodes
-      for (NodeId nodeId : nodeIdList) {
-        if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = getFSSchedulerNode(nodeId);
-          try {
-            if (Resources.fitsIn(minimumAllocation,
-                    node.getAvailableResource())) {
-              attemptScheduling(node);
-            }
-          } catch (Throwable ex) {
-            LOG.warn("Error while attempting scheduling for node " + node +
-                    ": " + ex.toString(), ex);
-          }
-        }
-      }
+  void continuousSchedulingAttempt() {
+    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+    // Sort the nodes by space available on them, so that we offer
+    // containers on emptier nodes first, facilitating an even spread. This
+    // requires holding the scheduler lock, so that the space available on a
+    // node doesn't change during the sort.
+    synchronized (this) {
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+    }
+
+    // iterate all nodes
+    for (NodeId nodeId : nodeIdList) {
+      FSSchedulerNode node = getFSSchedulerNode(nodeId);
       try {
-        Thread.sleep(getContinuousSchedulingSleepMs());
-      } catch (InterruptedException e) {
-        LOG.warn("Error while doing sleep in continuous scheduling: " +
-                e.toString(), e);
+        if (node != null && Resources.fitsIn(minimumAllocation,
+            node.getAvailableResource())) {
+          attemptScheduling(node);
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while attempting scheduling for node " + node +
+            ": " + ex.toString(), ex);
       }
     }
   }
@@ -1004,6 +1000,12 @@ public class FairScheduler extends
 
     @Override
     public int compare(NodeId n1, NodeId n2) {
+      if (!nodes.containsKey(n1)) {
+        return 1;
+      }
+      if (!nodes.containsKey(n2)) {
+        return -1;
+      }
       return RESOURCE_CALCULATOR.compare(clusterResource,
               nodes.get(n2).getAvailableResource(),
               nodes.get(n1).getAvailableResource());
@@ -1121,7 +1123,8 @@ public class FairScheduler extends
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1139,7 +1142,7 @@ public class FairScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1203,6 +1206,15 @@ public class FairScheduler extends
     waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
     usePortForNodeName = this.conf.getUsePortForNodeName();
 
+    updateInterval = this.conf.getUpdateInterval();
+    if (updateInterval < 0) {
+      updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
+      LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+              + " is invalid, so using default value " +
+              + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+              + " ms instead");
+    }
+
     rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
     // This stores per-application scheduling information
     this.applications =
@@ -1227,7 +1239,16 @@ public class FairScheduler extends
           new Runnable() {
             @Override
             public void run() {
-              continuousScheduling();
+              while (!Thread.currentThread().isInterrupted()) {
+                try {
+                  continuousSchedulingAttempt();
+                  Thread.sleep(getContinuousSchedulingSleepMs());
+                } catch (InterruptedException e) {
+                  LOG.error("Continuous scheduling thread interrupted. Exiting. ",
+                      e);
+                  return;
+                }
+              }
             }
           }
       );

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Jul 25 20:33:09 2014
@@ -123,6 +123,11 @@ public class FairSchedulerConfiguration 
   protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
   protected static final int DEFAULT_MAX_ASSIGN = -1;
 
+  /** The update interval for calculating resources in FairScheduler .*/
+  public static final String UPDATE_INTERVAL_MS =
+      CONF_PREFIX + "update-interval-ms";
+  public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
+
   public FairSchedulerConfiguration() {
     super();
   }
@@ -246,6 +251,10 @@ public class FairSchedulerConfiguration 
           "Error reading resource config", ex);
     }
   }
+
+  public long getUpdateInterval() {
+    return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
+  }
   
   private static int findResource(String val, String units)
     throws AllocationConfigurationException {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Fri Jul 25 20:33:09 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jul 25 20:33:09 2014
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -356,22 +355,28 @@ public class FifoScheduler extends
 
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
+      String queue, String user, boolean isAppRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
-          boolean shouldNotifyAttemptAdded) {
+          boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -389,14 +394,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -772,7 +778,8 @@ public class FifoScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -788,7 +795,7 @@ public class FifoScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -823,23 +830,6 @@ public class FifoScheduler extends
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
   private synchronized void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Fri Jul 25 20:33:09 2014
@@ -19,22 +19,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager exte
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
   public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public class AMRMTokenSecretManager exte
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
-    }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +215,35 @@ public class AMRMTokenSecretManager exte
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
-    }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken("Password not found for ApplicationAttempt "
+            + applicationAttemptId);
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Given AMRMToken for application : "
+          + applicationAttemptId.toString()
+          + " seems to have been generated illegally.");
+    } finally {
+      this.readLock.unlock();
     }
-    return password;
   }
 
   /**
@@ -167,4 +255,40 @@ public class AMRMTokenSecretManager exte
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Fri Jul 25 20:33:09 2014
@@ -388,7 +388,11 @@ public class DelegationTokenRenewer exte
       // If user provides incorrect token then it should not be added for
       // renewal.
       for (DelegationTokenToRenew dtr : tokenList) {
-        renewToken(dtr);
+        try {
+          renewToken(dtr);
+        } catch (IOException ioe) {
+          throw new IOException("Failed to renew token: " + dtr.token, ioe);
+        }
       }
       for (DelegationTokenToRenew dtr : tokenList) {
         addTokenToList(dtr);

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Fri Jul 25 20:33:09 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
+
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
 }



Mime
View raw message