hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1557318 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/proto/ hadoop-yarn/hadoop-yarn-common/src/main/java/...
Date Sat, 11 Jan 2014 07:07:17 GMT
Author: vinodkv
Date: Sat Jan 11 07:07:17 2014
New Revision: 1557318

URL: http://svn.apache.org/r1557318
Log:
YARN-1041. Added the ApplicationMasterProtocol API for applications to use the ability in ResourceManager to optionally not kill containers when the ApplicationMaster exits. Contributed by Jian He.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Jan 11 07:07:17 2014
@@ -63,6 +63,10 @@ Release 2.4.0 - UNRELEASED
 
     YARN-1033. Expose RM active/standby state to Web UI and REST API (kasha)
 
+    YARN-1041. Added the ApplicationMasterProtocol API for applications to use the
+    ability in ResourceManager to optionally not kill containers when the
+    ApplicationMaster exits. (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java Sat Jan 11 07:07:17 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.api.protocolrecords;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -27,6 +28,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -47,16 +49,19 @@ import org.apache.hadoop.yarn.util.Recor
 @Public
 @Stable
 public abstract class RegisterApplicationMasterResponse {
+
   @Private
   @Unstable
   public static RegisterApplicationMasterResponse newInstance(
       Resource minCapability, Resource maxCapability,
-      Map<ApplicationAccessType, String> acls, ByteBuffer key) {
+      Map<ApplicationAccessType, String> acls, ByteBuffer key,
+      List<Container> containersFromPreviousAttempt) {
     RegisterApplicationMasterResponse response =
         Records.newRecord(RegisterApplicationMasterResponse.class);
     response.setMaximumResourceCapability(maxCapability);
     response.setApplicationACLs(acls);
     response.setClientToAMTokenMasterKey(key);
+    response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
     return response;
   }
 
@@ -105,4 +110,30 @@ public abstract class RegisterApplicatio
   @Public
   @Stable
   public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
+
+  /**
+   * <p>
+   * Get the list of running containers as viewed by
+   * <code>ResourceManager</code> from previous application attempt.
+   * </p>
+   * 
+   * @return the list of running containers as viewed by
+   *         <code>ResourceManager</code> from previous application attempt
+   */
+  @Public
+  @Unstable
+  public abstract List<Container> getContainersFromPreviousAttempt();
+
+  /**
+   * Set the list of running containers as viewed by
+   * <code>ResourceManager</code> from previous application attempt.
+   * 
+   * @param containersFromPreviousAttempt
+   *          the list of running containers as viewed by
+   *          <code>ResourceManager</code> from previous application attempt.
+   */
+  @Private
+  @Unstable
+  public abstract void setContainersFromPreviousAttempt(
+      List<Container> containersFromPreviousAttempt);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Sat Jan 11 07:07:17 2014
@@ -44,6 +44,7 @@ message RegisterApplicationMasterRespons
   optional ResourceProto maximumCapability = 1;
   optional bytes client_to_am_token_master_key = 2;
   repeated ApplicationACLMapProto application_ACLs = 3;
+  repeated ContainerProto containers_from_previous_attempt = 4;
 }
 
 message FinishApplicationMasterRequestProto {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java Sat Jan 11 07:07:17 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.proto
 
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -29,10 +30,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
@@ -52,6 +56,7 @@ public class RegisterApplicationMasterRe
 
   private Resource maximumResourceCapability;
   private Map<ApplicationAccessType, String> applicationACLS = null;
+  private List<Container> containersFromPreviousAttempt = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -105,6 +110,9 @@ public class RegisterApplicationMasterRe
     if (this.applicationACLS != null) {
       addApplicationACLs();
     }
+    if (this.containersFromPreviousAttempt != null) {
+      addRunningContainersToProto();
+    }
   }
 
 
@@ -226,6 +234,43 @@ public class RegisterApplicationMasterRe
         ByteBuffer.wrap(builder.getClientToAmTokenMasterKey().toByteArray());
     return key;
   }
+
+  @Override
+  public List<Container> getContainersFromPreviousAttempt() {
+    if (this.containersFromPreviousAttempt != null) {
+      return this.containersFromPreviousAttempt;
+    }
+    initRunningContainersList();
+    return this.containersFromPreviousAttempt;
+  }
+
+  @Override
+  public void setContainersFromPreviousAttempt(final List<Container> containers) {
+    if (containers == null) {
+      return;
+    }
+    this.containersFromPreviousAttempt = new ArrayList<Container>();
+    this.containersFromPreviousAttempt.addAll(containers);
+  }
+
+  private void initRunningContainersList() {
+    RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
+    containersFromPreviousAttempt = new ArrayList<Container>();
+    for (ContainerProto c : list) {
+      containersFromPreviousAttempt.add(convertFromProtoFormat(c));
+    }
+  }
+
+  private void addRunningContainersToProto() {
+    maybeInitBuilder();
+    builder.clearContainersFromPreviousAttempt();
+    List<ContainerProto> list = new ArrayList<ContainerProto>();
+    for (Container c : containersFromPreviousAttempt) {
+      list.add(convertToProtoFormat(c));
+    }
+    builder.addAllContainersFromPreviousAttempt(list);
+  }
   
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
@@ -235,4 +280,11 @@ public class RegisterApplicationMasterRe
     return ((ResourcePBImpl)resource).getProto();
   }
 
+  private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+    return new ContainerPBImpl(p);
+  }
+
+  private ContainerProto convertToProtoFormat(Container t) {
+    return ((ContainerPBImpl) t).getProto();
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Sat Jan 11 07:07:17 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
@@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -271,6 +273,11 @@ public class ApplicationMasterService ex
             .getClientToAMTokenSecretManager()
             .getMasterKey(applicationAttemptId).getEncoded()));        
       }
+
+      List<Container> containerList =
+          ((AbstractYarnScheduler) rScheduler)
+            .getTransferredContainers(applicationAttemptId);
+      response.setContainersFromPreviousAttempt(containerList);
       return response;
     }
   }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1557318&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Sat Jan 11 07:07:17 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+public class AbstractYarnScheduler {
+
+  protected RMContext rmContext;
+  protected Map<ApplicationId, SchedulerApplication> applications;
+
+  public synchronized List<Container> getTransferredContainers(
+      ApplicationAttemptId currentAttempt) {
+    ApplicationId appId = currentAttempt.getApplicationId();
+    SchedulerApplication app = applications.get(appId);
+    List<Container> containerList = new ArrayList<Container>();
+    RMApp appImpl = this.rmContext.getRMApps().get(appId);
+    if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
+      return containerList;
+    }
+    Collection<RMContainer> liveContainers =
+        app.getCurrentAppAttempt().getLiveContainers();
+    ContainerId amContainerId =
+        rmContext.getRMApps().get(appId).getCurrentAppAttempt()
+          .getMasterContainer().getId();
+    for (RMContainer rmContainer : liveContainers) {
+      if (!rmContainer.getContainerId().equals(amContainerId)) {
+        containerList.add(rmContainer.getContainer());
+      }
+    }
+    return containerList;
+  }
+
+  public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
+    return applications;
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sat Jan 11 07:07:17 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -95,7 +96,7 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler
+public class CapacityScheduler extends AbstractYarnScheduler
   implements PreemptableResourceScheduler, CapacitySchedulerContext,
              Configurable {
 
@@ -177,7 +178,6 @@ public class CapacityScheduler
 
   private CapacitySchedulerConfiguration conf;
   private Configuration yarnConf;
-  private RMContext rmContext;
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
@@ -191,10 +191,6 @@ public class CapacityScheduler
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
   private boolean initialized = false;
 
   private ResourceCalculator calculator;
@@ -271,9 +267,10 @@ public class CapacityScheduler
       this.maximumAllocation = this.conf.getMaximumAllocation();
       this.calculator = this.conf.getResourceCalculator();
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
-
+      this.applications =
+          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
       this.rmContext = rmContext;
-      
+
       initializeQueues(this.conf);
       
       initialized = true;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sat Jan 11 07:07:17 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -120,10 +121,10 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler implements ResourceScheduler {
+public class FairScheduler extends AbstractYarnScheduler implements
+    ResourceScheduler {
   private boolean initialized;
   private FairSchedulerConfiguration conf;
-  private RMContext rmContext;
   private Resource minimumAllocation;
   private Resource maximumAllocation;
   private Resource incrAllocation;
@@ -157,11 +158,6 @@ public class FairScheduler implements Re
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // This stores per-application scheduling information,
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
   // Nodes in the cluster, indexed by NodeId
   private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -1235,6 +1231,9 @@ public class FairScheduler implements Re
       
       rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
+      // This stores per-application scheduling information
+      this.applications =
+          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1357,5 +1356,4 @@ public class FairScheduler implements Re
     queue.collectSchedulerApplications(apps);
     return apps;
   }
-
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig?rev=1557318&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig Sat Jan 11 07:07:17 2014
@@ -0,0 +1,1361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ * 
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion.  Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ * 
+ * A queue's name starts with the names of its parents, with periods as
+ * separators.  So a queue named "queue1" under the root named, would be 
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
+@LimitedPrivate("yarn")
+@Unstable
+@SuppressWarnings("unchecked")
+public class FairScheduler implements ResourceScheduler {
+  private boolean initialized;
+  private FairSchedulerConfiguration conf;
+  private RMContext rmContext;
+  private Resource minimumAllocation;
+  private Resource maximumAllocation;
+  private Resource incrAllocation;
+  private QueueManager queueMgr;
+  private Clock clock;
+  private boolean usePortForNodeName;
+
+  private static final Log LOG = LogFactory.getLog(FairScheduler.class);
+  
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DefaultResourceCalculator();
+  
+  // Value that container assignment methods return when a container is
+  // reserved
+  public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
+
+  // How often fair shares are re-calculated (ms)
+  protected long UPDATE_INTERVAL = 500;
+
+  private final static List<Container> EMPTY_CONTAINER_LIST =
+      new ArrayList<Container>();
+
+  private static final Allocation EMPTY_ALLOCATION =
+      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
+
+  // Aggregate metrics
+  FSQueueMetrics rootMetrics;
+
+  // Time when we last updated preemption vars
+  protected long lastPreemptionUpdateTime;
+  // Time we last ran preemptTasksIfNecessary
+  private long lastPreemptCheckTime;
+
+  // This stores per-application scheduling information,
+  @VisibleForTesting
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+  // Nodes in the cluster, indexed by NodeId
+  private Map<NodeId, FSSchedulerNode> nodes = 
+      new ConcurrentHashMap<NodeId, FSSchedulerNode>();
+
+  // Aggregate capacity of the cluster
+  private Resource clusterCapacity = 
+      RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+
+  // How often tasks are preempted 
+  protected long preemptionInterval; 
+  
+  // ms to wait before force killing stuff (must be longer than a couple
+  // of heartbeats to give task-kill commands a chance to act).
+  protected long waitTimeBeforeKill; 
+  
+  // Containers whose AMs have been warned that they will be preempted soon.
+  private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+  
+  protected boolean preemptionEnabled;
+  protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+  protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
+  protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
+  protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+  private Comparator nodeAvailableResourceComparator =
+          new NodeAvailableResourceComparator(); // Node available resource comparator
+  protected double nodeLocalityThreshold; // Cluster threshold for node locality
+  protected double rackLocalityThreshold; // Cluster threshold for rack locality
+  protected long nodeLocalityDelayMs; // Delay for node locality
+  protected long rackLocalityDelayMs; // Delay for rack locality
+  private FairSchedulerEventLog eventLog; // Machine-readable event log
+  protected boolean assignMultiple; // Allocate multiple containers per
+                                    // heartbeat
+  protected int maxAssign; // Max containers to assign per heartbeat
+
+  @VisibleForTesting
+  final MaxRunningAppsEnforcer maxRunningEnforcer;
+
+  private AllocationFileLoaderService allocsLoader;
+  @VisibleForTesting
+  AllocationConfiguration allocConf;
+  
+  public FairScheduler() {
+    clock = new SystemClock();
+    allocsLoader = new AllocationFileLoaderService();
+    queueMgr = new QueueManager(this);
+    maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
+  }
+
+  private void validateConf(Configuration conf) {
+    // validate scheduler memory allocation setting
+    int minMem = conf.getInt(
+      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    int maxMem = conf.getInt(
+      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+
+    if (minMem < 0 || minMem > maxMem) {
+      throw new YarnRuntimeException("Invalid resource scheduler memory"
+        + " allocation configuration"
+        + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+        + "=" + minMem
+        + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+        + "=" + maxMem + ", min should equal greater than 0"
+        + ", max should be no smaller than min.");
+    }
+
+    // validate scheduler vcores allocation setting
+    int minVcores = conf.getInt(
+      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    int maxVcores = conf.getInt(
+      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+    if (minVcores < 0 || minVcores > maxVcores) {
+      throw new YarnRuntimeException("Invalid resource scheduler vcores"
+        + " allocation configuration"
+        + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+        + "=" + minVcores
+        + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+        + "=" + maxVcores + ", min should equal greater than 0"
+        + ", max should be no smaller than min.");
+    }
+  }
+
+  public FairSchedulerConfiguration getConf() {
+    return conf;
+  }
+
+  public QueueManager getQueueManager() {
+    return queueMgr;
+  }
+
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  private FSSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
+  }
+
+  /**
+   * A runnable which calls {@link FairScheduler#update()} every
+   * <code>UPDATE_INTERVAL</code> milliseconds.
+   */
+  private class UpdateThread implements Runnable {
+    public void run() {
+      while (true) {
+        try {
+          Thread.sleep(UPDATE_INTERVAL);
+          update();
+          preemptTasksIfNecessary();
+        } catch (Exception e) {
+          LOG.error("Exception in fair scheduler UpdateThread", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Recompute the internal variables used by the scheduler - per-job weights,
+   * fair shares, deficits, minimum slot allocations, and amount of used and
+   * required resources per job.
+   */
+  protected synchronized void update() {
+    updatePreemptionVariables(); // Determine if any queues merit preemption
+
+    FSQueue rootQueue = queueMgr.getRootQueue();
+
+    // Recursively update demands for all queues
+    rootQueue.updateDemand();
+
+    rootQueue.setFairShare(clusterCapacity);
+    // Recursively compute fair shares for all queues
+    // and update metrics
+    rootQueue.recomputeShares();
+  }
+
+  /**
+   * Update the preemption fields for all QueueScheduables, i.e. the times since
+   * each queue last was at its guaranteed share and at > 1/2 of its fair share
+   * for each type of task.
+   */
+  private void updatePreemptionVariables() {
+    long now = clock.getTime();
+    lastPreemptionUpdateTime = now;
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
+      if (!isStarvedForMinShare(sched)) {
+        sched.setLastTimeAtMinShare(now);
+      }
+      if (!isStarvedForFairShare(sched)) {
+        sched.setLastTimeAtHalfFairShare(now);
+      }
+    }
+  }
+
+  /**
+   * Is a queue below its min share for the given task type?
+   */
+  boolean isStarvedForMinShare(FSLeafQueue sched) {
+    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+      sched.getMinShare(), sched.getDemand());
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+        sched.getResourceUsage(), desiredShare);
+  }
+
+  /**
+   * Is a queue being starved for fair share for the given task type? This is
+   * defined as being below half its fair share.
+   */
+  boolean isStarvedForFairShare(FSLeafQueue sched) {
+    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+        Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
+    return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
+        sched.getResourceUsage(), desiredFairShare);
+  }
+
+  /**
+   * Check for queues that need tasks preempted, either because they have been
+   * below their guaranteed share for minSharePreemptionTimeout or they have
+   * been below half their fair share for the fairSharePreemptionTimeout. If
+   * such queues exist, compute how many tasks of each type need to be preempted
+   * and then select the right ones using preemptTasks.
+   */
+  protected synchronized void preemptTasksIfNecessary() {
+    if (!preemptionEnabled) {
+      return;
+    }
+
+    long curTime = clock.getTime();
+    if (curTime - lastPreemptCheckTime < preemptionInterval) {
+      return;
+    }
+    lastPreemptCheckTime = curTime;
+
+    Resource resToPreempt = Resources.none();
+
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
+      resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
+    }
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
+        Resources.none())) {
+      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
+    }
+  }
+
+  /**
+   * Preempt a quantity of resources from a list of QueueSchedulables. The
+   * policy for this is to pick apps from queues that are over their fair share,
+   * but make sure that no queue is placed below its fair share in the process.
+   * We further prioritize preemption by choosing containers with lowest
+   * priority to preempt.
+   */
+  protected void preemptResources(Collection<FSLeafQueue> scheds,
+      Resource toPreempt) {
+    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
+      return;
+    }
+
+    Map<RMContainer, FSSchedulerApp> apps = 
+        new HashMap<RMContainer, FSSchedulerApp>();
+    Map<RMContainer, FSLeafQueue> queues = 
+        new HashMap<RMContainer, FSLeafQueue>();
+
+    // Collect running containers from over-scheduled queues
+    List<RMContainer> runningContainers = new ArrayList<RMContainer>();
+    for (FSLeafQueue sched : scheds) {
+      if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+          sched.getResourceUsage(), sched.getFairShare())) {
+        for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
+          for (RMContainer c : as.getApp().getLiveContainers()) {
+            runningContainers.add(c);
+            apps.put(c, as.getApp());
+            queues.put(c, sched);
+          }
+        }
+      }
+    }
+
+    // Sort containers into reverse order of priority
+    Collections.sort(runningContainers, new Comparator<RMContainer>() {
+      public int compare(RMContainer c1, RMContainer c2) {
+        int ret = c1.getContainer().getPriority().compareTo(
+            c2.getContainer().getPriority());
+        if (ret == 0) {
+          return c2.getContainerId().compareTo(c1.getContainerId());
+        }
+        return ret;
+      }
+    });
+    
+    // Scan down the list of containers we've already warned and kill them
+    // if we need to.  Remove any containers from the list that we don't need
+    // or that are no longer running.
+    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+    Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+    while (warnedIter.hasNext()) {
+      RMContainer container = warnedIter.next();
+      if (container.getState() == RMContainerState.RUNNING &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              toPreempt, Resources.none())) {
+        warnOrKillContainer(container, apps.get(container), queues.get(container));
+        preemptedThisRound.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      } else {
+        warnedIter.remove();
+      }
+    }
+
+    // Scan down the rest of the containers until we've preempted enough, making
+    // sure we don't preempt too many from any queue
+    Iterator<RMContainer> runningIter = runningContainers.iterator();
+    while (runningIter.hasNext() &&
+        Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+            toPreempt, Resources.none())) {
+      RMContainer container = runningIter.next();
+      FSLeafQueue sched = queues.get(container);
+      if (!preemptedThisRound.contains(container) &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              sched.getResourceUsage(), sched.getFairShare())) {
+        warnOrKillContainer(container, apps.get(container), sched);
+        
+        warnedContainers.add(container);
+        Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+      }
+    }
+  }
+  
+  private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+      FSLeafQueue queue) {
+    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+        "res=" + container.getContainer().getResource() +
+        ") from queue " + queue.getName());
+    
+    Long time = app.getContainerPreemptionTime(container);
+
+    if (time != null) {
+      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+      // proceed with kill
+      if (time + waitTimeBeforeKill < clock.getTime()) {
+        ContainerStatus status =
+          SchedulerUtils.createPreemptedContainerStatus(
+            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+
+        // TODO: Not sure if this ever actually adds this to the list of cleanup
+        // containers on the RMNode (see SchedulerNode.releaseContainer()).
+        completedContainer(container, status, RMContainerEventType.KILL);
+        LOG.info("Killing container" + container +
+            " (after waiting for premption for " +
+            (clock.getTime() - time) + "ms)");
+      }
+    } else {
+      // track the request in the FSSchedulerApp itself
+      app.addPreemption(container, clock.getTime());
+    }
+  }
+
+  /**
+   * Return the resource amount that this queue is allowed to preempt, if any.
+   * If the queue has been below its min share for at least its preemption
+   * timeout, it should preempt the difference between its current share and
+   * this min share. If it has been below half its fair share for at least the
+   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+   * full fair share. If both conditions hold, we preempt the max of the two
+   * amounts (this shouldn't happen unless someone sets the timeouts to be
+   * identical for some reason).
+   */
+  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
+    String queue = sched.getName();
+    long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
+    long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
+    Resource resDueToMinShare = Resources.none();
+    Resource resDueToFairShare = Resources.none();
+    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+          sched.getMinShare(), sched.getDemand());
+      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
+    }
+    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
+          sched.getFairShare(), sched.getDemand());
+      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
+    }
+    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
+        resDueToMinShare, resDueToFairShare);
+    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+        resToPreempt, Resources.none())) {
+      String message = "Should preempt " + resToPreempt + " res for queue "
+          + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+          + ", resDueToFairShare = " + resDueToFairShare;
+      LOG.info(message);
+    }
+    return resToPreempt;
+  }
+
+  public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+    return rmContext.getContainerTokenSecretManager();
+  }
+
+  // synchronized for sizeBasedWeight
+  public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
+    double weight = 1.0;
+    if (sizeBasedWeight) {
+      // Set weight based on current memory demand
+      weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
+    }
+    weight *= app.getPriority().getPriority();
+    if (weightAdjuster != null) {
+      // Run weight through the user-supplied weightAdjuster
+      weight = weightAdjuster.adjustWeight(app, weight);
+    }
+    return new ResourceWeights((float)weight);
+  }
+
+  @Override
+  public Resource getMinimumResourceCapability() {
+    return minimumAllocation;
+  }
+
+  public Resource getIncrementResourceCapability() {
+    return incrAllocation;
+  }
+
+  @Override
+  public Resource getMaximumResourceCapability() {
+    return maximumAllocation;
+  }
+
+  public double getNodeLocalityThreshold() {
+    return nodeLocalityThreshold;
+  }
+
+  public double getRackLocalityThreshold() {
+    return rackLocalityThreshold;
+  }
+
+  public long getNodeLocalityDelayMs() {
+    return nodeLocalityDelayMs;
+  }
+
+  public long getRackLocalityDelayMs() {
+    return rackLocalityDelayMs;
+  }
+
+  public boolean isContinuousSchedulingEnabled() {
+    return continuousSchedulingEnabled;
+  }
+
+  public synchronized int getContinuousSchedulingSleepMs() {
+    return continuousSchedulingSleepMs;
+  }
+
+  public Resource getClusterCapacity() {
+    return clusterCapacity;
+  }
+
+  public synchronized Clock getClock() {
+    return clock;
+  }
+
+  protected synchronized void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  public FairSchedulerEventLog getEventLog() {
+    return eventLog;
+  }
+
+  /**
+   * Add a new application to the scheduler, with a given id, queue name, and
+   * user. This will accept a new app even if the user or queue is above
+   * configured limits, but the app will not be marked as runnable.
+   */
+  protected synchronized void addApplication(ApplicationId applicationId,
+      String queueName, String user) {
+    if (queueName == null || queueName.isEmpty()) {
+      String message = "Reject application " + applicationId +
+              " submitted by user " + user + " with an empty queue name.";
+      LOG.info(message);
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
+      return;
+    }
+
+    RMApp rmApp = rmContext.getRMApps().get(applicationId);
+    FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+    if (queue == null) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppRejectedEvent(applicationId,
+              "Application rejected by queue placement policy"));
+      return;
+    }
+
+    // Enforce ACLs
+    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
+
+    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
+        && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+      String msg = "User " + userUgi.getUserName() +
+              " cannot submit applications to queue " + queue.getName();
+      LOG.info(msg);
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, msg));
+      return;
+    }
+  
+    SchedulerApplication application =
+        new SchedulerApplication(queue, user);
+    applications.put(applicationId, application);
+    queue.getMetrics().submitApp(user);
+
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", in queue: " + queueName + ", currently num of applications: "
+        + applications.size());
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
+
+  /**
+   * Add a new application attempt to the scheduler.
+   */
+  protected synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    String user = application.getUser();
+    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+    FSSchedulerApp attempt =
+        new FSSchedulerApp(applicationAttemptId, user,
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
+            rmContext);
+    if (transferStateFromPreviousAttempt) {
+      attempt.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(attempt);
+
+    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+    queue.addApp(attempt, runnable);
+    if (runnable) {
+      maxRunningEnforcer.trackRunnableApp(attempt);
+    } else {
+      maxRunningEnforcer.trackNonRunnableApp(attempt);
+    }
+    
+    queue.getMetrics().submitAppAttempt(user);
+
+    LOG.info("Added Application Attempt " + applicationAttemptId
+        + " to scheduler from user: " + user);
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
+  }
+  
+  @VisibleForTesting
+  FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
+    FSLeafQueue queue = null;
+    try {
+      QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
+      queueName = placementPolicy.assignAppToQueue(queueName, user);
+      if (queueName == null) {
+        return null;
+      }
+      queue = queueMgr.getLeafQueue(queueName, true);
+    } catch (IOException ex) {
+      LOG.error("Error assigning app to queue, rejecting", ex);
+    }
+    
+    if (rmApp != null) {
+      rmApp.setQueue(queue.getName());
+    } else {
+      LOG.warn("Couldn't find RM app to set queue name on");
+    }
+    
+    return queue;
+  }
+
+  private synchronized void removeApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+    if (application == null){
+      LOG.warn("Couldn't find application " + applicationId);
+      return;
+    }
+    application.stop(finalState);
+    applications.remove(applicationId);
+  }
+
+  private synchronized void removeApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId,
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
+    LOG.info("Application " + applicationAttemptId + " is done." +
+        " finalState=" + rmAppAttemptFinalState);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
+
+    if (attempt == null || application == null) {
+      LOG.info("Unknown application " + applicationAttemptId + " has completed!");
+      return;
+    }
+
+    // Release all the running containers
+    for (RMContainer rmContainer : attempt.getLiveContainers()) {
+      if (keepContainers
+          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + rmContainer.getContainerId());
+        continue;
+      }
+      completedContainer(rmContainer,
+          SchedulerUtils.createAbnormalContainerStatus(
+              rmContainer.getContainerId(),
+              SchedulerUtils.COMPLETED_APPLICATION),
+              RMContainerEventType.KILL);
+    }
+
+    // Release all reserved containers
+    for (RMContainer rmContainer : attempt.getReservedContainers()) {
+      completedContainer(rmContainer,
+          SchedulerUtils.createAbnormalContainerStatus(
+              rmContainer.getContainerId(),
+              "Application Complete"),
+              RMContainerEventType.KILL);
+    }
+    // Clean up pending requests, metrics etc.
+    attempt.stop(rmAppAttemptFinalState);
+
+    // Inform the queue
+    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
+        .getQueueName(), false);
+    boolean wasRunnable = queue.removeApp(attempt);
+
+    if (wasRunnable) {
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+    } else {
+      maxRunningEnforcer.untrackNonRunnableApp(attempt);
+    }
+  }
+
+  /**
+   * Clean up a completed container.
+   */
+  private synchronized void completedContainer(RMContainer rmContainer,
+      ContainerStatus containerStatus, RMContainerEventType event) {
+    if (rmContainer == null) {
+      LOG.info("Null container completed...");
+      return;
+    }
+
+    Container container = rmContainer.getContainer();
+
+    // Get the application for the finished container
+    FSSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
+    if (application == null) {
+      LOG.info("Container " + container + " of" +
+          " unknown application attempt " + appId +
+          " completed with event " + event);
+      return;
+    }
+
+    // Get the node on which the container was allocated
+    FSSchedulerNode node = nodes.get(container.getNodeId());
+
+    if (rmContainer.getState() == RMContainerState.RESERVED) {
+      application.unreserve(node, rmContainer.getReservedPriority());
+      node.unreserveResource(application);
+    } else {
+      application.containerCompleted(rmContainer, containerStatus, event);
+      node.releaseContainer(container);
+      updateRootQueueMetrics();
+    }
+
+    LOG.info("Application attempt " + application.getApplicationAttemptId()
+        + " released container " + container.getId() + " on node: " + node
+        + " with event: " + event);
+  }
+
+  private synchronized void addNode(RMNode node) {
+    nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
+    Resources.addTo(clusterCapacity, node.getTotalCapability());
+    updateRootQueueMetrics();
+
+    LOG.info("Added node " + node.getNodeAddress() +
+        " cluster capacity: " + clusterCapacity);
+  }
+
+  private synchronized void removeNode(RMNode rmNode) {
+    FSSchedulerNode node = nodes.get(rmNode.getNodeID());
+    // This can occur when an UNHEALTHY node reconnects
+    if (node == null) {
+      return;
+    }
+    Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+    updateRootQueueMetrics();
+
+    // Remove running containers
+    List<RMContainer> runningContainers = node.getRunningContainers();
+    for (RMContainer container : runningContainers) {
+      completedContainer(container,
+          SchedulerUtils.createAbnormalContainerStatus(
+              container.getContainerId(),
+              SchedulerUtils.LOST_CONTAINER),
+          RMContainerEventType.KILL);
+    }
+
+    // Remove reservations, if any
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      completedContainer(reservedContainer,
+          SchedulerUtils.createAbnormalContainerStatus(
+              reservedContainer.getContainerId(),
+              SchedulerUtils.LOST_CONTAINER),
+          RMContainerEventType.KILL);
+    }
+
+    nodes.remove(rmNode.getNodeID());
+    LOG.info("Removed node " + rmNode.getNodeAddress() +
+        " cluster capacity: " + clusterCapacity);
+  }
+
+  @Override
+  public Allocation allocate(ApplicationAttemptId appAttemptId,
+      List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
+
+    // Make sure this application exists
+    FSSchedulerApp application = getSchedulerApp(appAttemptId);
+    if (application == null) {
+      LOG.info("Calling allocate on removed " +
+          "or non existant application " + appAttemptId);
+      return EMPTY_ALLOCATION;
+    }
+
+    // Sanity check
+    SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
+        clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
+
+    // Release containers
+    for (ContainerId releasedContainerId : release) {
+      RMContainer rmContainer = getRMContainer(releasedContainerId);
+      if (rmContainer == null) {
+        RMAuditLogger.logFailure(application.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "FairScheduler",
+            "Trying to release container not owned by app or with invalid id",
+            application.getApplicationId(), releasedContainerId);
+      }
+      completedContainer(rmContainer,
+          SchedulerUtils.createAbnormalContainerStatus(
+              releasedContainerId,
+              SchedulerUtils.RELEASED_CONTAINER),
+          RMContainerEventType.RELEASED);
+    }
+
+    synchronized (application) {
+      if (!ask.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("allocate: pre-update" +
+              " applicationAttemptId=" + appAttemptId +
+              " application=" + application.getApplicationId());
+        }
+        application.showRequests();
+
+        // Update application requests
+        application.updateResourceRequests(ask);
+
+        LOG.debug("allocate: post-update");
+        application.showRequests();
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate:" +
+            " applicationAttemptId=" + appAttemptId +
+            " #ask=" + ask.size());
+
+        LOG.debug("Preempting " + application.getPreemptionContainers().size()
+            + " container(s)");
+      }
+      
+      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+      for (RMContainer container : application.getPreemptionContainers()) {
+        preemptionContainerIds.add(container.getContainerId());
+      }
+
+      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+      
+      return new Allocation(application.pullNewlyAllocatedContainers(),
+          application.getHeadroom(), preemptionContainerIds);
+    }
+  }
+
+  /**
+   * Process a container which has launched on a node, as reported by the node.
+   */
+  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
+    // Get the application for the finished container
+    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
+  /**
+   * Process a heartbeat update from a node.
+   */
+  private synchronized void nodeUpdate(RMNode nm) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+    }
+    eventLog.log("HEARTBEAT", nm.getHostName());
+    FSSchedulerNode node = nodes.get(nm.getNodeID());
+
+    // Update resource if any change
+    SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+    
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    } 
+    // Processing the newly launched containers
+    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+    }
+
+    // Process completed containers
+    for (ContainerStatus completedContainer : completedContainers) {
+      ContainerId containerId = completedContainer.getContainerId();
+      LOG.debug("Container FINISHED: " + containerId);
+      completedContainer(getRMContainer(containerId),
+          completedContainer, RMContainerEventType.FINISHED);
+    }
+
+    if (continuousSchedulingEnabled) {
+      if (!completedContainers.isEmpty()) {
+        attemptScheduling(node);
+      }
+    } else {
+      attemptScheduling(node);
+    }
+  }
+
+  private void continuousScheduling() {
+    while (true) {
+      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+
+      // iterate all nodes
+      for (NodeId nodeId : nodeIdList) {
+        if (nodes.containsKey(nodeId)) {
+          FSSchedulerNode node = nodes.get(nodeId);
+          try {
+            if (Resources.fitsIn(minimumAllocation,
+                    node.getAvailableResource())) {
+              attemptScheduling(node);
+            }
+          } catch (Throwable ex) {
+            LOG.warn("Error while attempting scheduling for node " + node +
+                    ": " + ex.toString(), ex);
+          }
+        }
+      }
+      try {
+        Thread.sleep(getContinuousSchedulingSleepMs());
+      } catch (InterruptedException e) {
+        LOG.warn("Error while doing sleep in continuous scheduling: " +
+                e.toString(), e);
+      }
+    }
+  }
+
+  /** Sort nodes by available resource */
+  private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+
+    @Override
+    public int compare(NodeId n1, NodeId n2) {
+      return RESOURCE_CALCULATOR.compare(clusterCapacity,
+              nodes.get(n2).getAvailableResource(),
+              nodes.get(n1).getAvailableResource());
+    }
+  }
+  
+  private synchronized void attemptScheduling(FSSchedulerNode node) {
+    // Assign new containers...
+    // 1. Check for reserved applications
+    // 2. Schedule if there are no reservations
+
+    AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
+    if (reservedAppSchedulable != null) {
+      Priority reservedPriority = node.getReservedContainer().getReservedPriority();
+      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
+        // Don't hold the reservation if app can no longer use it
+        LOG.info("Releasing reservation that cannot be satisfied for application "
+            + reservedAppSchedulable.getApp().getApplicationAttemptId()
+            + " on node " + node);
+        reservedAppSchedulable.unreserve(reservedPriority, node);
+        reservedAppSchedulable = null;
+      } else {
+        // Reservation exists; try to fulfill the reservation
+        LOG.info("Trying to fulfill reservation for application "
+            + reservedAppSchedulable.getApp().getApplicationAttemptId()
+            + " on node: " + node);
+
+        node.getReservedAppSchedulable().assignReservedContainer(node);
+      }
+    }
+    if (reservedAppSchedulable == null) {
+      // No reservation, schedule at queue which is farthest below fair share
+      int assignedContainers = 0;
+      while (node.getReservedContainer() == null) {
+        boolean assignedContainer = false;
+        if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+              queueMgr.getRootQueue().assignContainer(node),
+              Resources.none())) {
+          assignedContainers++;
+          assignedContainer = true;
+        }
+        if (!assignedContainer) { break; }
+        if (!assignMultiple) { break; }
+        if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
+      }
+    }
+    updateRootQueueMetrics();
+  }
+
+  @Override
+  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    FSSchedulerNode node = nodes.get(nodeId);
+    return node == null ? null : new SchedulerNodeReport(node);
+  }
+  
+  public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
+    SchedulerApplication app =
+        applications.get(appAttemptId.getApplicationId());
+    if (app != null) {
+      return (FSSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
+  }
+  
+  @Override
+  public SchedulerAppReport getSchedulerAppInfo(
+      ApplicationAttemptId appAttemptId) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
+      LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
+      return null;
+    }
+    return new SchedulerAppReport(attempt);
+  }
+  
+  @Override
+  public ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId appAttemptId) {
+    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
+    if (attempt == null) {
+      LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
+      return null;
+    }
+    return attempt.getResourceUsageReport();
+  }
+  
+  /**
+   * Subqueue metrics might be a little out of date because fair shares are
+   * recalculated at the update interval, but the root queue metrics needs to
+   * be updated synchronously with allocations and completions so that cluster
+   * metrics will be consistent.
+   */
+  private void updateRootQueueMetrics() {
+    rootMetrics.setAvailableResourcesToQueue(
+        Resources.subtract(
+            clusterCapacity, rootMetrics.getAllocatedResources()));
+  }
+
+  @Override
+  public QueueMetrics getRootQueueMetrics() {
+    return rootMetrics;
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+    case NODE_ADDED:
+      if (!(event instanceof NodeAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+      addNode(nodeAddedEvent.getAddedRMNode());
+      break;
+    case NODE_REMOVED:
+      if (!(event instanceof NodeRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
+      removeNode(nodeRemovedEvent.getRemovedRMNode());
+      break;
+    case NODE_UPDATE:
+      if (!(event instanceof NodeUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
+      break;
+    case APP_ADDED:
+      if (!(event instanceof AppAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
+      break;
+    case APP_REMOVED:
+      if (!(event instanceof AppRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      removeApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+      break;
+    case APP_ATTEMPT_ADDED:
+      if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+          (AppAttemptAddedSchedulerEvent) event;
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+      break;
+    case APP_ATTEMPT_REMOVED:
+      if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          (AppAttemptRemovedSchedulerEvent) event;
+      removeApplicationAttempt(
+          appAttemptRemovedEvent.getApplicationAttemptID(),
+          appAttemptRemovedEvent.getFinalAttemptState(),
+          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+      break;
+    case CONTAINER_EXPIRED:
+      if (!(event instanceof ContainerExpiredSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      ContainerExpiredSchedulerEvent containerExpiredEvent =
+          (ContainerExpiredSchedulerEvent)event;
+      ContainerId containerId = containerExpiredEvent.getContainerId();
+      completedContainer(getRMContainer(containerId),
+          SchedulerUtils.createAbnormalContainerStatus(
+              containerId,
+              SchedulerUtils.EXPIRED_CONTAINER),
+          RMContainerEventType.EXPIRE);
+      break;
+    default:
+      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
+    }
+  }
+
+  @Override
+  public void recover(RMState state) throws Exception {
+    // NOT IMPLEMENTED
+  }
+
+  @Override
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    if (!initialized) {
+      this.conf = new FairSchedulerConfiguration(conf);
+      validateConf(this.conf);
+      minimumAllocation = this.conf.getMinimumAllocation();
+      maximumAllocation = this.conf.getMaximumAllocation();
+      incrAllocation = this.conf.getIncrementAllocation();
+      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+      continuousSchedulingSleepMs =
+              this.conf.getContinuousSchedulingSleepMs();
+      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+      preemptionEnabled = this.conf.getPreemptionEnabled();
+      assignMultiple = this.conf.getAssignMultiple();
+      maxAssign = this.conf.getMaxAssign();
+      sizeBasedWeight = this.conf.getSizeBasedWeight();
+      preemptionInterval = this.conf.getPreemptionInterval();
+      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+      usePortForNodeName = this.conf.getUsePortForNodeName();
+      
+      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+      this.rmContext = rmContext;
+      this.eventLog = new FairSchedulerEventLog();
+      eventLog.init(this.conf);
+
+      initialized = true;
+
+      allocConf = new AllocationConfiguration(conf);
+      try {
+        queueMgr.initialize(conf);
+      } catch (Exception e) {
+        throw new IOException("Failed to start FairScheduler", e);
+      }
+
+      Thread updateThread = new Thread(new UpdateThread());
+      updateThread.setName("FairSchedulerUpdateThread");
+      updateThread.setDaemon(true);
+      updateThread.start();
+
+      if (continuousSchedulingEnabled) {
+        // start continuous scheduling thread
+        Thread schedulingThread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              continuousScheduling();
+            }
+          }
+        );
+        schedulingThread.setName("ContinuousScheduling");
+        schedulingThread.setDaemon(true);
+        schedulingThread.start();
+      }
+      
+      allocsLoader.init(conf);
+      allocsLoader.setReloadListener(new AllocationReloadListener());
+      // If we fail to load allocations file on initialize, we want to fail
+      // immediately.  After a successful load, exceptions on future reloads
+      // will just result in leaving things as they are.
+      try {
+        allocsLoader.reloadAllocations();
+      } catch (Exception e) {
+        throw new IOException("Failed to initialize FairScheduler", e);
+      }
+      allocsLoader.start();
+    } else {
+      try {
+        allocsLoader.reloadAllocations();
+      } catch (Exception e) {
+        LOG.error("Failed to reload allocations file", e);
+      }
+    }
+  }
+
+  @Override
+  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
+      boolean recursive) throws IOException {
+    if (!queueMgr.exists(queueName)) {
+      throw new IOException("queue " + queueName + " does not exist");
+    }
+    return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+        recursive);
+  }
+
+  @Override
+  public List<QueueUserACLInfo> getQueueUserAclInfo() {
+    UserGroupInformation user = null;
+    try {
+      user = UserGroupInformation.getCurrentUser();
+    } catch (IOException ioe) {
+      return new ArrayList<QueueUserACLInfo>();
+    }
+
+    return queueMgr.getRootQueue().getQueueUserAclInfo(user);
+  }
+
+  @Override
+  public int getNumClusterNodes() {
+    return nodes.size();
+  }
+
+  @Override
+  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+      QueueACL acl, String queueName) {
+    FSQueue queue = getQueueManager().getQueue(queueName);
+    if (queue == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ACL not found for queue access-type " + acl
+            + " for queue " + queueName);
+      }
+      return false;
+    }
+    return queue.hasAccess(acl, callerUGI);
+  }
+  
+  public AllocationConfiguration getAllocationConfiguration() {
+    return allocConf;
+  }
+  
+  private class AllocationReloadListener implements
+      AllocationFileLoaderService.Listener {
+
+    @Override
+    public void onReload(AllocationConfiguration queueInfo) {
+      // Commit the reload; also create any queue defined in the alloc file
+      // if it does not already exist, so it can be displayed on the web UI.
+      synchronized (FairScheduler.this) {
+        allocConf = queueInfo;
+        allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+        queueMgr.updateAllocationConfiguration(allocConf);
+      }
+    }
+  }
+
+  @Override
+  public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
+    FSQueue queue = queueMgr.getQueue(queueName);
+    if (queue == null) {
+      return null;
+    }
+    List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
+    queue.collectSchedulerApplications(apps);
+    return apps;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sat Jan 11 07:07:17 2014
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -104,7 +105,8 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class FifoScheduler implements ResourceScheduler, Configurable {
+public class FifoScheduler extends AbstractYarnScheduler implements
+    ResourceScheduler, Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
 
@@ -115,7 +117,6 @@ public class FifoScheduler implements Re
 
   private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
-  private RMContext rmContext;
 
   protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
@@ -124,11 +125,6 @@ public class FifoScheduler implements Re
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
-  // Use ConcurrentSkipListMap because applications need to be ordered
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
-
   private ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
@@ -243,6 +239,9 @@ public class FifoScheduler implements Re
     if (!this.initialized) {
       validateConf(conf);
       this.rmContext = rmContext;
+      //Use ConcurrentSkipListMap because applications need to be ordered
+      this.applications =
+          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
       this.minimumAllocation = 
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Sat Jan 11 07:07:17 2014
@@ -24,6 +24,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -150,7 +151,29 @@ public class TestAMRestart {
     ApplicationAttemptId newAttemptId =
         app1.getCurrentAppAttempt().getAppAttemptId();
     Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
-    MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
+
+    // launch the new AM
+    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
+    nm1.nodeHeartbeat(true);
+    MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
+    RegisterApplicationMasterResponse registerResponse =
+        am2.registerAppAttempt();
+
+    // Assert two containers are running: container2 and container3;
+    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
+      .size());
+    boolean containerId2Exists = false, containerId3Exists = false;
+    for (Container container : registerResponse
+      .getContainersFromPreviousAttempt()) {
+      if (container.getId().equals(containerId2)) {
+        containerId2Exists = true;
+      }
+      if (container.getId().equals(containerId3)) {
+        containerId3Exists = true;
+      }
+    }
+    Assert.assertTrue(containerId2Exists && containerId3Exists);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
 
     // complete container by sending the container complete event which has earlier
     // attempt's attemptId

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1557318&r1=1557317&r2=1557318&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Sat Jan 11 07:07:17 2014
@@ -642,7 +642,7 @@ public class TestCapacityScheduler {
 
     SchedulerApplication app =
         TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
-          cs.applications, cs, "a1");
+          cs.getSchedulerApplications(), cs, "a1");
     Assert.assertEquals("a1", app.getQueue().getQueueName());
   }
  }



Mime
View raw message