hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [3/3] hadoop git commit: YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
Date Wed, 30 Dec 2015 23:42:08 GMT
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05fa852d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05fa852d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05fa852d

Branch: refs/heads/branch-2
Commit: 05fa852d7567b7590d6b53bbf925f8f424736514
Parents: 6eefae1
Author: Wangda Tan <wangda@apache.org>
Authored: Wed Dec 30 15:36:55 2015 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Wed Dec 30 15:36:55 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../scheduler/AppSchedulingInfo.java            | 379 +++++++++----------
 .../scheduler/SchedulerApplicationAttempt.java  |   2 +-
 3 files changed, 174 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c208c17..146ed62 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -26,6 +26,8 @@ Release 2.9.0 - UNRELEASED
 
     YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
 
+    YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index c5f8cd1..41d3fd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -42,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -56,40 +56,36 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class AppSchedulingInfo {
   
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
+  private static final Comparator COMPARATOR =
+      new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
+  private static final int EPOCH_BIT_SHIFT = 40;
+
+  private final ApplicationId applicationId;
   private final ApplicationAttemptId applicationAttemptId;
-  final ApplicationId applicationId;
-  private String queueName;
-  Queue queue;
-  final String user;
-  // TODO making containerIdCounter long
   private final AtomicLong containerIdCounter;
-  private final int EPOCH_BIT_SHIFT = 40;
+  private final String user;
+
+  private Queue queue;
+  private ActiveUsersManager activeUsersManager;
+  private boolean pending = true; // whether accepted/allocated by scheduler
+  private ResourceUsage appResourceUsage;
+
+  private final Set<String> amBlacklist = new HashSet<>();
+  private Set<String> userBlacklist = new HashSet<>();
 
-  final Set<Priority> priorities = new TreeSet<Priority>(
-      new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+  final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
   final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
-      new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
-  final Map<NodeId, Map<Priority, Map<ContainerId, 
-      SchedContainerChangeRequest>>> increaseRequestMap =
       new ConcurrentHashMap<>();
-  private Set<String> userBlacklist = new HashSet<>();
-  private Set<String> amBlacklist = new HashSet<>();
+  final Map<NodeId, Map<Priority, Map<ContainerId,
+      SchedContainerChangeRequest>>> containerIncreaseRequestMap =
+      new ConcurrentHashMap<>();
 
-  //private final ApplicationStore store;
-  private ActiveUsersManager activeUsersManager;
-  
-  /* Allocated by scheduler */
-  boolean pending = true; // for app metrics
-  
-  private ResourceUsage appResourceUsage;
- 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       long epoch, ResourceUsage appResourceUsage) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
-    this.queueName = queue.getQueueName();
     this.user = user;
     this.activeUsersManager = activeUsersManager;
     this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
@@ -104,14 +100,18 @@ public class AppSchedulingInfo {
     return applicationAttemptId;
   }
 
-  public String getQueueName() {
-    return queueName;
-  }
-
   public String getUser() {
     return user;
   }
 
+  public long getNewContainerId() {
+    return this.containerIdCounter.incrementAndGet();
+  }
+
+  public synchronized String getQueueName() {
+    return queue.getQueueName();
+  }
+
   public synchronized boolean isPending() {
     return pending;
   }
@@ -125,30 +125,23 @@ public class AppSchedulingInfo {
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
-  public long getNewContainerId() {
-    return this.containerIdCounter.incrementAndGet();
-  }
-  
-  public boolean hasIncreaseRequest(NodeId nodeId) {
+  public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-        increaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      return false;
-    }
-    return requestsOnNode.size() > 0;
+        containerIncreaseRequestMap.get(nodeId);
+    return requestsOnNode == null ? false : requestsOnNode.size() > 0;
   }
   
-  public Map<ContainerId, SchedContainerChangeRequest>
+  public synchronized Map<ContainerId, SchedContainerChangeRequest>
       getIncreaseRequests(NodeId nodeId, Priority priority) {
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-        increaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      return null;
-    }
-
-    return requestsOnNode.get(priority);
+        containerIncreaseRequestMap.get(nodeId);
+    return requestsOnNode == null ? null : requestsOnNode.get(priority);
   }
 
+  /**
+   * return true if any of the existing increase requests are updated,
+   *        false if none of them are updated
+   */
   public synchronized boolean updateIncreaseRequests(
       List<SchedContainerChangeRequest> increaseRequests) {
     boolean resourceUpdated = false;
@@ -157,10 +150,10 @@ public class AppSchedulingInfo {
       NodeId nodeId = r.getRMContainer().getAllocatedNode();
 
       Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-          increaseRequestMap.get(nodeId);
+          containerIncreaseRequestMap.get(nodeId);
       if (null == requestsOnNode) {
         requestsOnNode = new TreeMap<>();
-        increaseRequestMap.put(nodeId, requestsOnNode);
+        containerIncreaseRequestMap.put(nodeId, requestsOnNode);
       }
 
       SchedContainerChangeRequest prevChangeRequest =
@@ -168,22 +161,21 @@ public class AppSchedulingInfo {
       if (null != prevChangeRequest) {
         if (Resources.equals(prevChangeRequest.getTargetCapacity(),
             r.getTargetCapacity())) {
-          // New target capacity is as same as what we have, just ignore the new
-          // one
+          // increase request hasn't changed
           continue;
         }
 
-        // remove the old one
+        // remove the old one, as we will use the new one going forward
         removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
             prevChangeRequest.getContainerId());
       }
 
-      if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource()))
{
+      if (Resources.equals(r.getTargetCapacity(),
+          r.getRMContainer().getAllocatedResource())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to increase/decrease container, "
-              + "target capacity = previous capacity = " + prevChangeRequest
-              + " for container=" + r.getContainerId()
-              + ". Will ignore this increase request");
+          LOG.debug("Trying to increase container " + r.getContainerId()
+              + ", target capacity = previous capacity = " + prevChangeRequest
+              + ". Will ignore this increase request.");
         }
         continue;
       }
@@ -195,25 +187,26 @@ public class AppSchedulingInfo {
     return resourceUpdated;
   }
 
-  // insert increase request and add missing hierarchy if missing
+  /**
+   * Insert increase request, adding any missing items in the data-structure
+   * hierarchy.
+   */
   private void insertIncreaseRequest(SchedContainerChangeRequest request) {
     NodeId nodeId = request.getNodeId();
     Priority priority = request.getPriority();
     ContainerId containerId = request.getContainerId();
     
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-        increaseRequestMap.get(nodeId);
+        containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
-      requestsOnNode =
-          new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>();
-      increaseRequestMap.put(nodeId, requestsOnNode);
+      requestsOnNode = new HashMap<>();
+      containerIncreaseRequestMap.put(nodeId, requestsOnNode);
     }
 
     Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
         requestsOnNode.get(priority);
     if (null == requestsOnNodeWithPriority) {
-      requestsOnNodeWithPriority =
-          new TreeMap<ContainerId, SchedContainerChangeRequest>();
+      requestsOnNodeWithPriority = new TreeMap<>();
       requestsOnNode.put(priority, requestsOnNodeWithPriority);
     }
 
@@ -237,7 +230,7 @@ public class AppSchedulingInfo {
   public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
       ContainerId containerId) {
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-        increaseRequestMap.get(nodeId);
+        containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
       return false;
     }
@@ -256,7 +249,7 @@ public class AppSchedulingInfo {
       requestsOnNode.remove(priority);
     }
     if (requestsOnNode.isEmpty()) {
-      increaseRequestMap.remove(nodeId);
+      containerIncreaseRequestMap.remove(nodeId);
     }
     
     if (request == null) {
@@ -279,18 +272,15 @@ public class AppSchedulingInfo {
   public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
       Priority priority, ContainerId containerId) {
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
-        increaseRequestMap.get(nodeId);
+        containerIncreaseRequestMap.get(nodeId);
     if (null == requestsOnNode) {
       return null;
     }
 
     Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
         requestsOnNode.get(priority);
-    if (null == requestsOnNodeWithPriority) {
-      return null;
-    }
-
-    return requestsOnNodeWithPriority.get(containerId);
+    return requestsOnNodeWithPriority == null ? null
+        : requestsOnNodeWithPriority.get(containerId);
   }
 
   /**
@@ -299,121 +289,120 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
-   * @param recoverPreemptedRequest recover Resource Request on preemption
-   * @return true if any resource was updated, false else
+   * @param recoverPreemptedRequest recover ResourceRequest on preemption
+   * @return true if any resource was updated, false otherwise
    */
-  synchronized public boolean updateResourceRequests(
+  public synchronized boolean updateResourceRequests(
       List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
-    QueueMetrics metrics = queue.getMetrics();
-    
+    // Flag to track if any incoming requests update "ANY" requests
     boolean anyResourcesUpdated = false;
 
     // Update resource requests
     for (ResourceRequest request : requests) {
       Priority priority = request.getPriority();
       String resourceName = request.getResourceName();
-      boolean updatePendingResources = false;
-      ResourceRequest lastRequest = null;
 
-      if (resourceName.equals(ResourceRequest.ANY)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("update:" + " application=" + applicationId + " request="
-              + request);
-        }
-        updatePendingResources = true;
-        anyResourcesUpdated = true;
-        
-        // Premature optimization?
-        // Assumes that we won't see more than one priority request updated
-        // in one call, reasonable assumption... however, it's totally safe
-        // to activate same application more than once.
-        // Thus we don't need another loop ala the one in decrementOutstanding()  
-        // which is needed during deactivate.
-        if (request.getNumContainers() > 0) {
-          activeUsersManager.activateApplication(user, applicationId);
-        }
-        ResourceRequest previousAnyRequest =
-            getResourceRequest(priority, resourceName);
-
-        // When there is change in ANY request label expression, we should
-        // update label for all resource requests already added of same
-        // priority as ANY resource request.
-        if ((null == previousAnyRequest)
-            || isRequestLabelChanged(previousAnyRequest, request)) {
-          Map<String, ResourceRequest> resourceRequest =
-              getResourceRequests(priority);
-          if (resourceRequest != null) {
-            for (ResourceRequest r : resourceRequest.values()) {
-              if (!r.getResourceName().equals(ResourceRequest.ANY)) {
-                r.setNodeLabelExpression(request.getNodeLabelExpression());
-              }
-            }
-          }
-        }
-      } else {
-        ResourceRequest anyRequest =
-            getResourceRequest(priority, ResourceRequest.ANY);
-        if (anyRequest != null) {
-          request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
-        }
-      }
+      // Update node labels if required
+      updateNodeLabels(request);
 
       Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
-
       if (asks == null) {
-        asks = new ConcurrentHashMap<String, ResourceRequest>();
+        asks = new ConcurrentHashMap<>();
         this.resourceRequestMap.put(priority, asks);
         this.priorities.add(priority);
       }
-      lastRequest = asks.get(resourceName);
 
+      // Increment number of containers if recovering preempted resources
+      ResourceRequest lastRequest = asks.get(resourceName);
       if (recoverPreemptedRequest && lastRequest != null) {
-        // Increment the number of containers to 1, as it is recovering a
-        // single container.
         request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
+      // Update asks
       asks.put(resourceName, request);
-      if (updatePendingResources) {
-        
-        // Similarly, deactivate application?
-        if (request.getNumContainers() <= 0) {
-          LOG.info("checking for deactivate of application :"
-              + this.applicationId);
-          checkForDeactivation();
-        }
-        
-        int lastRequestContainers = lastRequest != null ? lastRequest
-            .getNumContainers() : 0;
-        Resource lastRequestCapability = lastRequest != null ? lastRequest
-            .getCapability() : Resources.none();
-        metrics.incrPendingResources(user, request.getNumContainers(),
-            request.getCapability());
-        metrics.decrPendingResources(user, lastRequestContainers,
-            lastRequestCapability);
-        
-        // update queue:
-        Resource increasedResource =
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers());
-        queue.incPendingResource(request.getNodeLabelExpression(),
-            increasedResource);
-        appResourceUsage.incPending(request.getNodeLabelExpression(),
-            increasedResource);
-        if (lastRequest != null) {
-          Resource decreasedResource =
-              Resources.multiply(lastRequestCapability, lastRequestContainers);
-          queue.decPendingResource(lastRequest.getNodeLabelExpression(),
-              decreasedResource);
-          appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
-              decreasedResource);
+
+      if (resourceName.equals(ResourceRequest.ANY)) {
+        anyResourcesUpdated = true;
+
+        // Activate application. Metrics activation is done here.
+        // TODO: Shouldn't we activate even if numContainers = 0?
+        if (request.getNumContainers() > 0) {
+          activeUsersManager.activateApplication(user, applicationId);
         }
+
+        // Update pendingResources
+        updatePendingResources(lastRequest, request, queue.getMetrics());
       }
     }
     return anyResourcesUpdated;
   }
 
-  private boolean isRequestLabelChanged(ResourceRequest requestOne,
+  private void updatePendingResources(ResourceRequest lastRequest,
+      ResourceRequest request, QueueMetrics metrics) {
+    if (request.getNumContainers() <= 0) {
+      LOG.info("checking for deactivate of application :"
+          + this.applicationId);
+      checkForDeactivation();
+    }
+
+    int lastRequestContainers =
+        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
+    Resource lastRequestCapability =
+        lastRequest != null ? lastRequest.getCapability() : Resources.none();
+    metrics.incrPendingResources(user,
+        request.getNumContainers(), request.getCapability());
+    metrics.decrPendingResources(user,
+        lastRequestContainers, lastRequestCapability);
+
+    // update queue:
+    Resource increasedResource =
+        Resources.multiply(request.getCapability(), request.getNumContainers());
+    queue.incPendingResource(request.getNodeLabelExpression(),
+        increasedResource);
+    appResourceUsage.incPending(request.getNodeLabelExpression(),
+        increasedResource);
+    if (lastRequest != null) {
+      Resource decreasedResource =
+          Resources.multiply(lastRequestCapability, lastRequestContainers);
+      queue.decPendingResource(lastRequest.getNodeLabelExpression(),
+          decreasedResource);
+      appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
+          decreasedResource);
+    }
+  }
+
+  private void updateNodeLabels(ResourceRequest request) {
+    Priority priority = request.getPriority();
+    String resourceName = request.getResourceName();
+    if (resourceName.equals(ResourceRequest.ANY)) {
+      ResourceRequest previousAnyRequest =
+          getResourceRequest(priority, resourceName);
+
+      // When there is change in ANY request label expression, we should
+      // update label for all resource requests already added of same
+      // priority as ANY resource request.
+      if ((null == previousAnyRequest)
+          || hasRequestLabelChanged(previousAnyRequest, request)) {
+        Map<String, ResourceRequest> resourceRequest =
+            getResourceRequests(priority);
+        if (resourceRequest != null) {
+          for (ResourceRequest r : resourceRequest.values()) {
+            if (!r.getResourceName().equals(ResourceRequest.ANY)) {
+              r.setNodeLabelExpression(request.getNodeLabelExpression());
+            }
+          }
+        }
+      }
+    } else {
+      ResourceRequest anyRequest =
+          getResourceRequest(priority, ResourceRequest.ANY);
+      if (anyRequest != null) {
+        request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
+      }
+    }
+  }
+
+  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
       ResourceRequest requestTwo) {
     String requestOneLabelExp = requestOne.getNodeLabelExpression();
     String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
@@ -465,24 +454,24 @@ public class AppSchedulingInfo {
     }
   }
 
-  synchronized public Collection<Priority> getPriorities() {
+  public synchronized Collection<Priority> getPriorities() {
     return priorities;
   }
 
-  synchronized public Map<String, ResourceRequest> getResourceRequests(
+  public synchronized Map<String, ResourceRequest> getResourceRequests(
       Priority priority) {
     return resourceRequestMap.get(priority);
   }
 
-  public List<ResourceRequest> getAllResourceRequests() {
-    List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
+  public synchronized List<ResourceRequest> getAllResourceRequests() {
+    List<ResourceRequest> ret = new ArrayList<>();
     for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
       ret.addAll(r.values());
     }
     return ret;
   }
 
-  synchronized public ResourceRequest getResourceRequest(Priority priority,
+  public synchronized ResourceRequest getResourceRequest(Priority priority,
       String resourceName) {
     Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
     return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
@@ -511,7 +500,7 @@ public class AppSchedulingInfo {
       }
     }
   }
-  
+
   public synchronized void increaseContainer(
       SchedContainerChangeRequest increaseRequest) {
     NodeId nodeId = increaseRequest.getNodeId();
@@ -559,28 +548,17 @@ public class AppSchedulingInfo {
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
-   * 
-   * @param type
-   *          the type of the node
-   * @param node
-   *          the nodeinfo of the node
-   * @param priority
-   *          the priority of the request.
-   * @param request
-   *          the request
-   * @param container
-   *          the containers allocated.
    */
-  synchronized public List<ResourceRequest> allocate(NodeType type,
+  public synchronized List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, Priority priority, ResourceRequest request,
-      Container container) {
-    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
+      Container containerAllocated) {
+    List<ResourceRequest> resourceRequests = new ArrayList<>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container, resourceRequests);
+      allocateNodeLocal(node, priority, request, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container, resourceRequests);
+      allocateRackLocal(node, priority, request, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container, resourceRequests);
+      allocateOffSwitch(request, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -592,8 +570,8 @@ public class AppSchedulingInfo {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationId=" + applicationId
-          + " container=" + container.getId()
-          + " host=" + container.getNodeId().toString()
+          + " container=" + containerAllocated.getId()
+          + " host=" + containerAllocated.getNodeId().toString()
           + " user=" + user
           + " resource=" + request.getCapability()
           + " type=" + type);
@@ -606,12 +584,9 @@ public class AppSchedulingInfo {
   /**
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
-   * 
-   * @param allocatedContainers
-   *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal(SchedulerNode node,
-      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+  private synchronized void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
@@ -641,12 +616,9 @@ public class AppSchedulingInfo {
   /**
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
-   * 
-   * @param allocatedContainers
-   *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(SchedulerNode node,
-      Priority priority, ResourceRequest rackLocalRequest, Container container,
+  private synchronized void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decResourceRequest(node.getRackName(), priority, rackLocalRequest);
@@ -663,20 +635,16 @@ public class AppSchedulingInfo {
   /**
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
-   * 
-   * @param allocatedContainers
-   *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(SchedulerNode node,
-      Priority priority, ResourceRequest offSwitchRequest, Container container,
-      List<ResourceRequest> resourceRequests) {
+  private synchronized void allocateOffSwitch(
+      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
     // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
-  synchronized private void decrementOutstanding(
+  private synchronized void decrementOutstanding(
       ResourceRequest offSwitchRequest) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
@@ -695,7 +663,7 @@ public class AppSchedulingInfo {
         offSwitchRequest.getCapability());
   }
   
-  synchronized private void checkForDeactivation() {
+  private synchronized void checkForDeactivation() {
     boolean deactivate = true;
     for (Priority priority : getPriorities()) {
       ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
@@ -709,7 +677,7 @@ public class AppSchedulingInfo {
     
     // also we need to check increase request
     if (!deactivate) {
-      deactivate = increaseRequestMap.isEmpty();
+      deactivate = containerIncreaseRequestMap.isEmpty();
     }
 
     if (deactivate) {
@@ -717,7 +685,7 @@ public class AppSchedulingInfo {
     }
   }
   
-  synchronized public void move(Queue newQueue) {
+  public synchronized void move(Queue newQueue) {
     QueueMetrics oldMetrics = queue.getMetrics();
     QueueMetrics newMetrics = newQueue.getMetrics();
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@@ -741,10 +709,9 @@ public class AppSchedulingInfo {
     activeUsersManager = newQueue.getActiveUsersManager();
     activeUsersManager.activateApplication(user, applicationId);
     this.queue = newQueue;
-    this.queueName = newQueue.getQueueName();
   }
 
-  synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+  public synchronized void stop() {
     // clear pending resources metrics for the application
     QueueMetrics metrics = queue.getMetrics();
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@@ -782,12 +749,8 @@ public class AppSchedulingInfo {
 
   public synchronized void transferStateFromPreviousAppSchedulingInfo(
       AppSchedulingInfo appInfo) {
-    //    this.priorities = appInfo.getPriorities();
-    //    this.requests = appInfo.getRequests();
     // This should not require locking the userBlacklist since it will not be
     // used by this instance until after setCurrentAppAttempt.
-    // Should cleanup this to avoid sharing between instances and can
-    // then remove getBlacklist as well.
     this.userBlacklist = appInfo.getBlackList();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 09f3598..4d81350 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -331,7 +331,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity
{
   public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
     // Cleanup all scheduling information
     isStopped = true;
-    appSchedulingInfo.stop(rmAppAttemptFinalState);
+    appSchedulingInfo.stop();
   }
 
   public synchronized boolean isStopped() {


Mime
View raw message