hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1502084 [1/2] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/ha...
Date Thu, 11 Jul 2013 01:20:38 GMT
Author: cdouglas
Date: Thu Jul 11 01:20:37 2013
New Revision: 1502084

URL: http://svn.apache.org/r1502084
Log:
YARN-569. Add support for requesting and enforcing preemption requests via
a capacity monitor. Contributed by Carlo Curino, Chris Douglas

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java   (with props)
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java   (with props)
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Jul 11 01:20:37 2013
@@ -456,6 +456,9 @@ Release 2.1.0-beta - 2013-07-02
 
     YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
 
+    YARN-569. Add support for requesting and enforcing preemption requests via
+    a capacity monitor. (Carlo Curino, cdouglas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java Thu Jul 11 01:20:37 2013
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.util.Recor
 @Stable
 public abstract class Priority implements Comparable<Priority> {
 
+  public static final Priority UNDEFINED = newInstance(-1);
+
   @Public
   @Stable
   public static Priority newInstance(int p) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Jul 11 01:20:37 2013
@@ -132,6 +132,18 @@ public class YarnConfiguration extends C
     RM_PREFIX + "scheduler.client.thread-count";
   public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
 
+  /**
+   * Enable periodic monitor threads.
+   * @see #RM_SCHEDULER_MONITOR_POLICIES
+   */
+  public static final String RM_SCHEDULER_ENABLE_MONITORS =
+    RM_PREFIX + "scheduler.monitor.enable";
+  public static final boolean DEFAULT_RM_SCHEDULER_ENABLE_MONITORS = false;
+
+  /** List of SchedulingEditPolicy classes affecting the scheduler. */
+  public static final String RM_SCHEDULER_MONITOR_POLICIES =
+    RM_PREFIX + "scheduler.monitor.policies";
+
   /** The address of the RM web application.*/
   public static final String RM_WEBAPP_ADDRESS = 
     RM_PREFIX + "webapp.address";

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Thu Jul 11 01:20:37 2013
@@ -291,6 +291,22 @@
     <value>1000</value>
   </property>
 
+  <property>
+    <description>Enable a set of periodic monitors (specified in
+        yarn.resourcemanager.scheduler.monitor.policies) that affect the
+        scheduler.</description>
+    <name>yarn.resourcemanager.scheduler.monitor.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>The list of SchedulingEditPolicy classes that interact with
+        the scheduler. A particular module may be incompatible with the
+        scheduler, other policies, or a configuration of either.</description>
+    <name>yarn.resourcemanager.scheduler.monitor.policies</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>The hostname of the NM.</description>

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu Jul 11 01:20:37 2013
@@ -411,28 +411,30 @@ public class ApplicationMasterService ex
       allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
       
-      AllocateResponse oldResponse =
-          responseMap.put(appAttemptId, allocateResponse);
-      if (oldResponse == null) {
-        // appAttempt got unregistered, remove it back out
-        responseMap.remove(appAttemptId);
-        String message = "App Attempt removed from the cache during allocate"
-            + appAttemptId;
-        LOG.error(message);
-        return resync;
-      }
-      
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
    
       // add preemption to the allocateResponse message (if any)
       allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
-      
+
       // Adding NMTokens for allocated containers.
       if (!allocation.getContainers().isEmpty()) {
         allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
             .createAndGetNMTokens(app.getUser(), appAttemptId,
                 allocation.getContainers()));
       }
+
+      // before returning response, verify in sync
+      AllocateResponse oldResponse =
+          responseMap.put(appAttemptId, allocateResponse);
+      if (oldResponse == null) {
+        // appAttempt got unregistered, remove it back out
+        responseMap.remove(appAttemptId);
+        String message = "App Attempt removed from the cache during allocate"
+            + appAttemptId;
+        LOG.error(message);
+        return resync;
+      }
+
       return allocateResponse;
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Jul 11 01:20:37 2013
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -61,9 +63,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -237,6 +243,9 @@ public class ResourceManager extends Com
       throw new RuntimeException("Failed to initialize scheduler", ioe);
     }
 
+    // creating monitors that handle preemption
+    createPolicyMonitors();
+
     masterService = createApplicationMasterService();
     addService(masterService) ;
 
@@ -315,7 +324,8 @@ public class ResourceManager extends Com
     } catch (ClassNotFoundException e) {
       throw new YarnRuntimeException("Could not instantiate Scheduler: "
           + schedulerClassName, e);
-    }  }
+    }
+  }
 
   protected ApplicationMasterLauncher createAMLauncher() {
     return new ApplicationMasterLauncher(this.rmContext);
@@ -477,6 +487,36 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public static final class
+    RMContainerPreemptEventDispatcher
+      implements EventHandler<ContainerPreemptEvent> {
+
+    private final PreemptableResourceScheduler scheduler;
+
+    public RMContainerPreemptEventDispatcher(
+        PreemptableResourceScheduler scheduler) {
+      this.scheduler = scheduler;
+    }
+
+    @Override
+    public void handle(ContainerPreemptEvent event) {
+      ApplicationAttemptId aid = event.getAppId();
+      RMContainer container = event.getContainer();
+      switch (event.getType()) {
+      case DROP_RESERVATION:
+        scheduler.dropContainerReservation(container);
+        break;
+      case PREEMPT_CONTAINER:
+        scheduler.preemptContainer(aid, container);
+        break;
+      case KILL_CONTAINER:
+        scheduler.killContainer(container);
+        break;
+      }
+    }
+  }
+
+  @Private
   public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
 
@@ -676,7 +716,37 @@ public class ResourceManager extends Com
   protected ApplicationMasterService createApplicationMasterService() {
     return new ApplicationMasterService(this.rmContext, scheduler);
   }
-  
+
+  protected void createPolicyMonitors() {
+    if (scheduler instanceof PreemptableResourceScheduler
+        && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
+      LOG.info("Loading policy monitors");
+      List<SchedulingEditPolicy> policies = conf.getInstances(
+              YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+              SchedulingEditPolicy.class);
+      if (policies.size() > 0) {
+        this.rmDispatcher.register(ContainerPreemptEventType.class,
+          new RMContainerPreemptEventDispatcher(
+            (PreemptableResourceScheduler) scheduler));
+        for (SchedulingEditPolicy policy : policies) {
+          LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
+          policy.init(conf, this.rmContext.getDispatcher().getEventHandler(),
+              (PreemptableResourceScheduler) scheduler);
+          // periodically check whether we need to take action to guarantee
+          // constraints
+          SchedulingMonitor mon = new SchedulingMonitor(policy);
+          addService(mon);
+
+        }
+      } else {
+        LOG.warn("Policy monitors configured (" +
+            YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
+            ") but none specified (" +
+            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
+      }
+    }
+  }
 
   protected AdminService createAdminService(
       ClientRMService clientRMService, 

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+
+public interface SchedulingEditPolicy {
+
+  public void init(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      PreemptableResourceScheduler scheduler);
+
+  /**
+   * This method is invoked at regular intervals. Internally the policy is
+   * allowed to track containers and affect the scheduler. The "actions"
+   * performed are passed back through an EventHandler.
+   */
+  public void editSchedule();
+
+  public long getMonitoringInterval();
+
+  public String getPolicyName();
+
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SchedulingMonitor extends AbstractService {
+
+  private final SchedulingEditPolicy scheduleEditPolicy;
+  private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class);
+
+  //thread which runs periodically to see the last time since a heartbeat is
+  //received.
+  private Thread checkerThread;
+  private volatile boolean stopped;
+  private long monitorInterval;
+
+  public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+    super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
+    this.scheduleEditPolicy = scheduleEditPolicy;
+    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+  }
+
+  public long getMonitorInterval() {
+    return monitorInterval;
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    assert !stopped : "starting when already stopped";
+    checkerThread = new Thread(new PreemptionChecker());
+    checkerThread.setName(getName());
+    checkerThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    stopped = true;
+    if (checkerThread != null) {
+      checkerThread.interrupt();
+    }
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  public void invokePolicy(){
+    scheduleEditPolicy.editSchedule();
+  }
+
+  private class PreemptionChecker implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        //invoke the preemption policy at a regular pace
+        //the policy will generate preemption or kill events
+        //managed by the dispatcher
+        invokePolicy();
+        try {
+          Thread.sleep(monitorInterval);
+        } catch (InterruptedException e) {
+          LOG.info(getName() + " thread interrupted");
+          break;
+        }
+      }
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class implement a {@link SchedulingEditPolicy} that is designed to be
+ * paired with the {@code CapacityScheduler}. At every invocation of {@code
+ * editSchedule()} it computes the ideal amount of resources assigned to each
+ * queue (for each queue in the hierarchy), and determines whether preemption
+ * is needed. Overcapacity is distributed among queues in a weighted fair manner,
+ * where the weight is the amount of guaranteed capacity for the queue.
+ * Based on this ideal assignment it determines whether preemption is required
+ * and select a set of containers from each application that would be killed if
+ * the corresponding amount of resources is not freed up by the application.
+ *
+ * If not in {@code observeOnly} mode, it triggers preemption requests via a
+ * {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure
+ * to deliver to the application (or to execute).
+ *
+ * If the deficit of resources is persistent over a long enough period of time
+ * this policy will trigger forced termination of containers (again by generating
+ * {@link ContainerPreemptEvent}).
+ */
+public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy {
+
+  private static final Log LOG =
+    LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
+
+  /** If true, run the policy but do not affect the cluster with preemption and
+   * kill events. */
+  public static final String OBSERVE_ONLY =
+      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+  /** Time in milliseconds between invocations of this policy */
+  public static final String MONITORING_INTERVAL =
+      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+  /** Time in milliseconds between requesting a preemption from an application
+   * and killing the container. */
+  public static final String WAIT_TIME_BEFORE_KILL =
+      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+  /** Maximum percentage of resources preempted in a single round. By
+   * controlling this value one can throttle the pace at which containers are
+   * reclaimed from the cluster. After computing the total desired preemption,
+   * the policy scales it back within this limit. */
+  public static final String TOTAL_PREEMPTION_PER_ROUND =
+      "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+  /** Maximum amount of resources above the target capacity ignored for
+   * preemption. This defines a deadzone around the target capacity that helps
+   * prevent thrashing and oscillations around the computed target balance.
+   * High values would slow the time to capacity and (absent natural
+   * completions) it might prevent convergence to guaranteed capacity. */
+  public static final String MAX_IGNORED_OVER_CAPACITY =
+    "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String NATURAL_TERMINATION_FACTOR =
+      "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+
+  //the dispatcher to send preempt and kill events
+  public EventHandler<ContainerPreemptEvent> dispatcher;
+
+  private final Clock clock;
+  private double maxIgnoredOverCapacity;
+  private long maxWaitTime;
+  private CapacityScheduler scheduler;
+  private long monitoringInterval;
+  private final Map<RMContainer,Long> preempted =
+    new HashMap<RMContainer,Long>();
+  private ResourceCalculator rc;
+  private float percentageClusterPreemptionAllowed;
+  private double naturalTerminationFactor;
+  private boolean observeOnly;
+
+  public ProportionalCapacityPreemptionPolicy() {
+    clock = new SystemClock();
+  }
+
+  public ProportionalCapacityPreemptionPolicy(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      CapacityScheduler scheduler) {
+    this(config, dispatcher, scheduler, new SystemClock());
+  }
+
+  public ProportionalCapacityPreemptionPolicy(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      CapacityScheduler scheduler, Clock clock) {
+    init(config, dispatcher, scheduler);
+    this.clock = clock;
+  }
+
+  public void init(Configuration config,
+      EventHandler<ContainerPreemptEvent> disp,
+      PreemptableResourceScheduler sched) {
+    LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
+    assert null == scheduler : "Unexpected duplicate call to init";
+    if (!(sched instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException("Class " +
+          sched.getClass().getCanonicalName() + " not instance of " +
+          CapacityScheduler.class.getCanonicalName());
+    }
+    dispatcher = disp;
+    scheduler = (CapacityScheduler) sched;
+    maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
+    naturalTerminationFactor =
+      config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
+    maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
+    monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
+    percentageClusterPreemptionAllowed =
+      config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
+    observeOnly = config.getBoolean(OBSERVE_ONLY, false);
+    rc = scheduler.getResourceCalculator();
+  }
+
+  @Override
+  public void editSchedule(){
+    CSQueue root = scheduler.getRootQueue();
+    Resource clusterResources =
+      Resources.clone(scheduler.getClusterResources());
+    containerBasedPreemptOrKill(root, clusterResources);
+  }
+
+  /**
+   * This method selects and tracks containers to be preempted. If a container
+   * is in the target list for more than maxWaitTime it is killed.
+   *
+   * @param root the root of the CapacityScheduler queue hierarchy
+   * @param clusterResources the total amount of resources in the cluster
+   */
+  private void containerBasedPreemptOrKill(CSQueue root,
+      Resource clusterResources) {
+
+    // extract a summary of the queues from scheduler
+    TempQueue tRoot;
+    synchronized (scheduler) {
+      tRoot = cloneQueues(root, clusterResources);
+    }
+
+    // compute the ideal distribution of resources among queues
+    // updates cloned queues state accordingly
+    tRoot.idealAssigned = tRoot.guaranteed;
+    Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
+        percentageClusterPreemptionAllowed);
+    List<TempQueue> queues =
+      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+
+    // based on ideal allocation select containers to be preempted from each
+    // queue and each application
+    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
+        getContainersToPreempt(queues, clusterResources);
+
+    logToCSV(queues);
+
+    // if we are in observeOnly mode return before any action is taken
+    if (observeOnly) {
+      return;
+    }
+
+    // preempt (or kill) the selected containers
+    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
+         : toPreempt.entrySet()) {
+      for (RMContainer container : e.getValue()) {
+        // if we tried to preempt this for more than maxWaitTime
+        if (preempted.get(container) != null &&
+            preempted.get(container) + maxWaitTime < clock.getTime()) {
+          // kill it
+          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
+                ContainerPreemptEventType.KILL_CONTAINER));
+          preempted.remove(container);
+        } else {
+          //otherwise just send preemption events
+          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
+                ContainerPreemptEventType.PREEMPT_CONTAINER));
+          if (preempted.get(container) == null) {
+            preempted.put(container, clock.getTime());
+          }
+        }
+      }
+    }
+
+    // Keep the preempted list clean
+    for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
+      RMContainer id = i.next();
+      // garbage collect containers that are irrelevant for preemption
+      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
+        i.remove();
+      }
+    }
+  }
+
+  /**
+   * This method recursively computes the ideal assignment of resources to each
+   * level of the hierarchy. This ensures that leafs that are over-capacity but
+   * with parents within capacity will not be preempted. Preemptions are allowed
+   * within each subtree according to local over/under capacity.
+   *
+   * @param root the root of the cloned queue hierachy
+   * @param totalPreemptionAllowed maximum amount of preemption allowed
+   * @return a list of leaf queues updated with preemption targets
+   */
+  private List<TempQueue> recursivelyComputeIdealAssignment(
+      TempQueue root, Resource totalPreemptionAllowed) {
+    List<TempQueue> leafs = new ArrayList<TempQueue>();
+    if (root.getChildren() != null &&
+        root.getChildren().size() > 0) {
+      // compute ideal distribution at this level
+      computeIdealResourceDistribution(rc, root.getChildren(),
+          totalPreemptionAllowed, root.idealAssigned);
+      // compute recursively for lower levels and build list of leafs
+      for(TempQueue t : root.getChildren()) {
+        leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
+      }
+    } else {
+      // we are in a leaf nothing to do, just return yourself
+      return Collections.singletonList(root);
+    }
+    return leafs;
+  }
+
+  /**
+   * This method computes (for a single level in the tree, passed as a {@code
+   * List<TempQueue>}) the ideal assignment of resources. This is done
+   * recursively to allocate capacity fairly across all queues with pending
+   * demands. It terminates when no resources are left to assign, or when all
+   * demand is satisfied.
+   *
+   * @param rc resource calculator
+   * @param queues a list of cloned queues to be assigned capacity to (this is
+   * an out param)
+   * @param totalPreemptionAllowed total amount of preemption we allow
+   * @param tot_guarant the amount of capacity assigned to this pool of queues
+   */
+  private void computeIdealResourceDistribution(ResourceCalculator rc,
+      List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
+
+    // qAlloc tracks currently active queues (will decrease progressively as
+    // demand is met)
+    List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
+    // unassigned tracks how much resources are still to assign, initialized
+    // with the total capacity for this set of queues
+    Resource unassigned = Resources.clone(tot_guarant);
+
+    //assign all cluster resources until no more demand, or no resources are left
+    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+          unassigned, Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(rc, unassigned, qAlloc);
+
+      // offer for each queue their capacity first and in following invocations
+      // their share of over-capacity
+      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+        TempQueue sub = i.next();
+        Resource wQavail =
+          Resources.multiply(unassigned, sub.normalizedGuarantee);
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+        // if the queue returned a value > 0 it means it is fully satisfied
+        // and it is removed from the list of active queues qAlloc
+        if (!Resources.greaterThan(rc, tot_guarant,
+              wQdone, Resources.none())) {
+          i.remove();
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+
+    // based on ideal assignment computed above and current assignment we derive
+    // how much preemption is required overall
+    Resource totPreemptionNeeded = Resource.newInstance(0, 0);
+    for (TempQueue t:queues) {
+      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
+        Resources.addTo(totPreemptionNeeded,
+            Resources.subtract(t.current, t.idealAssigned));
+      }
+    }
+
+    // if we need to preempt more than is allowed, compute a factor (0<f<1)
+    // that is used to scale down how much we ask back from each queue
+    float scalingFactor = 1.0F;
+    if (Resources.greaterThan(rc, tot_guarant,
+          totPreemptionNeeded, totalPreemptionAllowed)) {
+       scalingFactor = Resources.divide(rc, tot_guarant,
+           totalPreemptionAllowed, totPreemptionNeeded);
+    }
+
+    // assign to each queue the amount of actual preemption based on local
+    // information of ideal preemption and scaling factor
+    for (TempQueue t : queues) {
+      t.assignPreemption(scalingFactor, rc, tot_guarant);
+    }
+    if (LOG.isDebugEnabled()) {
+      long time = clock.getTime();
+      for (TempQueue t : queues) {
+        LOG.debug(time + ": " + t);
+      }
+    }
+
+  }
+
+  /**
+   * Computes a normalizedGuaranteed capacity based on active queues
+   * @param rc resource calculator
+   * @param clusterResource the total amount of resources in the cluster
+   * @param queues the list of queues to consider
+   */
+  private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
+      List<TempQueue> queues) {
+    Resource activeCap = Resource.newInstance(0, 0);
+    for (TempQueue q : queues) {
+      Resources.addTo(activeCap, q.guaranteed);
+    }
+    for (TempQueue q : queues) {
+      q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+          q.guaranteed, activeCap);
+    }
+  }
+
+  /**
+   * Based a resource preemption target drop reservations of containers and
+   * if necessary select containers for preemption from applications in each
+   * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
+   * account for containers that will naturally complete.
+   *
+   * @param queues set of leaf queues to preempt from
+   * @param clusterResource total amount of cluster resources
+   * @return a map of applciationID to set of containers to preempt
+   */
+  private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
+      List<TempQueue> queues, Resource clusterResource) {
+
+    Map<ApplicationAttemptId,Set<RMContainer>> list =
+        new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+
+    for (TempQueue qT : queues) {
+      // we act only if we are violating balance by more than
+      // maxIgnoredOverCapacity
+      if (Resources.greaterThan(rc, clusterResource, qT.current,
+          Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
+        // we introduce a dampening factor naturalTerminationFactor that
+        // accounts for natural termination of containers
+        Resource resToObtain =
+          Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+
+        // lock the leafqueue while we scan applications and unreserve
+        synchronized(qT.leafQueue) {
+          NavigableSet<FiCaSchedulerApp> ns =
+            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+          Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
+          qT.actuallyPreempted = Resources.clone(resToObtain);
+          while (desc.hasNext()) {
+            FiCaSchedulerApp fc = desc.next();
+            if (Resources.lessThanOrEqual(rc, clusterResource,
+                resToObtain, Resources.none())) {
+              break;
+            }
+            list.put(fc.getApplicationAttemptId(),
+            preemptFrom(fc, clusterResource, resToObtain));
+          }
+        }
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Given a target preemption for a specific application, select containers
+   * to preempt (after unreserving all reservation for that app).
+   *
+   * @param app
+   * @param clusterResource
+   * @param rsrcPreempt
+   * @return
+   */
+  private Set<RMContainer> preemptFrom(
+      FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+    Set<RMContainer> ret = new HashSet<RMContainer>();
+    ApplicationAttemptId appId = app.getApplicationAttemptId();
+
+    // first drop reserved containers towards rsrcPreempt
+    List<RMContainer> reservations =
+        new ArrayList<RMContainer>(app.getReservedContainers());
+    for (RMContainer c : reservations) {
+      if (Resources.lessThanOrEqual(rc, clusterResource,
+          rsrcPreempt, Resources.none())) {
+        return ret;
+      }
+      if (!observeOnly) {
+        dispatcher.handle(new ContainerPreemptEvent(appId, c,
+            ContainerPreemptEventType.DROP_RESERVATION));
+      }
+      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
+    }
+
+    // if more resources are to be freed go through all live containers in
+    // reverse priority and reverse allocation order and mark them for
+    // preemption
+    List<RMContainer> containers =
+      new ArrayList<RMContainer>(app.getLiveContainers());
+
+    sortContainers(containers);
+
+    for (RMContainer c : containers) {
+      if (Resources.lessThanOrEqual(rc, clusterResource,
+            rsrcPreempt, Resources.none())) {
+        return ret;
+      }
+      ret.add(c);
+      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
+    }
+
+    return ret;
+  }
+
+  /**
+   * Compare by reversed priority order first, and then reversed containerId
+   * order
+   * @param containers
+   */
+  @VisibleForTesting
+  static void sortContainers(List<RMContainer> containers){
+    Collections.sort(containers, new Comparator<RMContainer>() {
+      @Override
+      public int compare(RMContainer a, RMContainer b) {
+        Comparator<Priority> c = new org.apache.hadoop.yarn.server
+            .resourcemanager.resource.Priority.Comparator();
+        int priorityComp = c.compare(b.getContainer().getPriority(),
+                                     a.getContainer().getPriority());
+        if (priorityComp != 0) {
+          return priorityComp;
+        }
+        return b.getContainerId().getId() -
+               a.getContainerId().getId();
+      }
+    });
+  }
+
+  @Override
+  public long getMonitoringInterval() {
+    return monitoringInterval;
+  }
+
+  @Override
+  public String getPolicyName() {
+    return "ProportionalCapacityPreemptionPolicy";
+  }
+
+
+  /**
+   * This method walks a tree of CSQueue and clones the portion of the state
+   * relevant for preemption in TempQueue(s). It also maintains a pointer to
+   * the leaves. Finally it aggregates pending resources in each queue and rolls
+   * it up to higher levels.
+   *
+   * @param root the root of the CapacityScheduler queue hierarchy
+   * @param clusterResources the total amount of resources in the cluster
+   * @return the root of the cloned queue hierarchy
+   */
+  private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
+    TempQueue ret;
+    synchronized (root) {
+    float absUsed = root.getAbsoluteUsedCapacity();
+      Resource current = Resources.multiply(clusterResources, absUsed);
+      Resource guaranteed =
+        Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+      if (root instanceof LeafQueue) {
+        LeafQueue l = (LeafQueue) root;
+        Resource pending = l.getTotalResourcePending();
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret.setLeafQueue(l);
+      } else {
+        Resource pending = Resource.newInstance(0, 0);
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        for (CSQueue c : root.getChildQueues()) {
+          ret.addChild(cloneQueues(c, clusterResources));
+        }
+      }
+    }
+    return ret;
+  }
+
+  // simple printout function that reports internal queue state (useful for
+  // plotting)
+  private void logToCSV(List<TempQueue> unorderedqueues){
+    List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
+    Collections.sort(queues, new Comparator<TempQueue>(){
+      @Override
+      public int compare(TempQueue o1, TempQueue o2) {
+        return o1.queueName.compareTo(o2.queueName);
+      }});
+    String queueState = " QUEUESTATE: " + clock.getTime();
+    StringBuilder sb = new StringBuilder();
+    sb.append(queueState);
+    for (TempQueue tq : queues) {
+      sb.append(", ");
+      tq.appendLogString(sb);
+    }
+    LOG.info(sb.toString());
+  }
+
+  /**
+   * Temporary data-structure tracking resource availability, pending resource
+   * need, current utilization. Used to clone {@link CSQueue}.
+   */
+  static class TempQueue {
+    final String queueName;
+    final Resource current;
+    final Resource pending;
+    final Resource guaranteed;
+    Resource idealAssigned;
+    Resource toBePreempted;
+    Resource actuallyPreempted;
+
+    double normalizedGuarantee;
+
+    final ArrayList<TempQueue> children;
+    LeafQueue leafQueue;
+
+    TempQueue(String queueName, Resource current, Resource pending,
+        Resource guaranteed) {
+      this.queueName = queueName;
+      this.current = current;
+      this.pending = pending;
+      this.guaranteed = guaranteed;
+      this.idealAssigned = Resource.newInstance(0, 0);
+      this.actuallyPreempted = Resource.newInstance(0, 0);
+      this.toBePreempted = Resource.newInstance(0, 0);
+      this.normalizedGuarantee = Float.NaN;
+      this.children = new ArrayList<TempQueue>();
+    }
+
+    public void setLeafQueue(LeafQueue l){
+      assert children.size() == 0;
+      this.leafQueue = l;
+    }
+
+    /**
+     * When adding a child we also aggregate its pending resource needs.
+     * @param q the child queue to add to this queue
+     */
+    public void addChild(TempQueue q) {
+      assert leafQueue == null;
+      children.add(q);
+      Resources.addTo(pending, q.pending);
+    }
+
+    public void addChildren(ArrayList<TempQueue> queues) {
+      assert leafQueue == null;
+      children.addAll(queues);
+    }
+
+
+    public ArrayList<TempQueue> getChildren(){
+      return children;
+    }
+
+    // This function "accepts" all the resources it can (pending) and return
+    // the unused ones
+    Resource offer(Resource avail, ResourceCalculator rc,
+        Resource clusterResource) {
+      // remain = avail - min(avail, current + pending - assigned)
+      Resource accepted = Resources.min(rc, clusterResource,
+          avail,
+          Resources.subtract(
+              Resources.add(current, pending),
+              idealAssigned));
+      Resource remain = Resources.subtract(avail, accepted);
+      Resources.addTo(idealAssigned, accepted);
+      return remain;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("CUR: ").append(current)
+        .append(" PEN: ").append(pending)
+        .append(" GAR: ").append(guaranteed)
+        .append(" NORM: ").append(normalizedGuarantee)
+        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
+        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+
+      return sb.toString();
+    }
+    public void assignPreemption(float scalingFactor,
+        ResourceCalculator rc, Resource clusterResource) {
+      if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
+          toBePreempted = Resources.multiply(
+              Resources.subtract(current, idealAssigned), scalingFactor);
+      } else {
+        toBePreempted = Resource.newInstance(0, 0);
+      }
+    }
+
+    void appendLogString(StringBuilder sb) {
+      sb.append(queueName).append(", ")
+        .append(current.getMemory()).append(", ")
+        .append(current.getVirtualCores()).append(", ")
+        .append(pending.getMemory()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(guaranteed.getMemory()).append(", ")
+        .append(guaranteed.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemory()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemory()).append(", ")
+        .append(toBePreempted.getVirtualCores() ).append(", ")
+        .append(actuallyPreempted.getMemory()).append(", ")
+        .append(actuallyPreempted.getVirtualCores());
+    }
+
+  }
+
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Thu Jul 11 01:20:37 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -205,6 +206,14 @@ public class AppSchedulingInfo {
     return requests.get(priority);
   }
 
+  synchronized public List<ResourceRequest> getAllResourceRequests() {
+    List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
+    for (Map<String, ResourceRequest> r : requests.values()) {
+      ret.addAll(r.values());
+    }
+    return ret;
+  }
+
   synchronized public ResourceRequest getResourceRequest(Priority priority,
       String resourceName) {
     Map<String, ResourceRequest> nodeRequests = requests.get(priority);

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Simple event class used to communicate containers unreservations, preemption, killing
+ */
+public class ContainerPreemptEvent
+    extends AbstractEvent<ContainerPreemptEventType> {
+
+  private final ApplicationAttemptId aid;
+  private final RMContainer container;
+
+  public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
+      ContainerPreemptEventType type) {
+    super(type);
+    this.aid = aid;
+    this.container = container;
+  }
+
+  public RMContainer getContainer(){
+    return this.container;
+  }
+
+  public ApplicationAttemptId getAppId() {
+    return aid;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.append(" ").append(getAppId());
+    sb.append(" ").append(getContainer().getContainerId());
+    return sb.toString();
+  }
+
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+public enum ContainerPreemptEventType {
+
+  DROP_RESERVATION,
+  PREEMPT_CONTAINER,
+  KILL_CONTAINER
+
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java?rev=1502084&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java (added)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java Thu Jul 11 01:20:37 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Interface for a scheduler that supports preemption/killing
+ *
+ */
+public interface PreemptableResourceScheduler extends ResourceScheduler {
+
+  /**
+   * If the scheduler support container reservations, this method is used to
+   * ask the scheduler to drop the reservation for the given container.
+   * @param container Reference to reserved container allocation.
+   */
+  void dropContainerReservation(RMContainer container);
+
+  /**
+   * Ask the scheduler to obtain back the container from a specific application
+   * by issuing a preemption request
+   * @param aid the application from which we want to get a container back
+   * @param container the container we want back
+   */
+  void preemptContainer(ApplicationAttemptId aid, RMContainer container);
+
+  /**
+   * Ask the scheduler to forcibly interrupt the container given as input
+   * @param container
+   */
+  void killContainer(RMContainer container);
+
+}

Propchange: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Jul 11 01:20:37 2013
@@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -83,8 +83,9 @@ import org.apache.hadoop.yarn.util.resou
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler 
-implements ResourceScheduler, CapacitySchedulerContext, Configurable {
+public class CapacityScheduler
+  implements PreemptableResourceScheduler, CapacitySchedulerContext,
+             Configurable {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
@@ -525,8 +526,8 @@ implements ResourceScheduler, CapacitySc
     
     // Sanity check
     SchedulerUtils.normalizeRequests(
-        ask, calculator, getClusterResources(), minimumAllocation,
-        maximumAllocation);
+        ask, getResourceCalculator(), getClusterResources(),
+        getMinimumResourceCapability(), maximumAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -578,9 +579,8 @@ implements ResourceScheduler, CapacitySc
           " #ask=" + ask.size());
       }
 
-      return new Allocation(
-          application.pullNewlyAllocatedContainers(), 
-          application.getHeadroom());
+      return application.getAllocation(getResourceCalculator(),
+                   clusterResource, getMinimumResourceCapability());
     }
   }
 
@@ -812,7 +812,8 @@ implements ResourceScheduler, CapacitySc
     Container container = rmContainer.getContainer();
     
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+    ApplicationAttemptId applicationAttemptId =
+      container.getId().getApplicationAttemptId();
     FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
@@ -869,5 +870,41 @@ implements ResourceScheduler, CapacitySc
     FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
-  
+
+  @Override
+  public void dropContainerReservation(RMContainer container) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("DROP_RESERVATION:" + container.toString());
+    }
+    completedContainer(container,
+        SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(),
+            SchedulerUtils.UNRESERVED_CONTAINER),
+        RMContainerEventType.KILL);
+  }
+
+  @Override
+  public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
+          " container: " + cont.toString());
+    }
+    FiCaSchedulerApp app = applications.get(aid);
+    if (app != null) {
+      app.addPreemptContainer(cont.getContainerId());
+    }
+  }
+
+  @Override
+  public void killContainer(RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("KILL_CONTAINER: container" + cont.toString());
+    }
+    completedContainer(cont,
+        SchedulerUtils.createAbnormalContainerStatus(
+            cont.getContainerId(),"Container being forcibly preempted:"
+        + cont.getContainerId()),
+        RMContainerEventType.KILL);
+  }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Jul 11 01:20:37 2013
@@ -1390,18 +1390,20 @@ public class LeafQueue implements CSQueu
     node.reserveResource(application, priority, rmContainer);
   }
 
-  private void unreserve(FiCaSchedulerApp application, Priority priority, 
+  private boolean unreserve(FiCaSchedulerApp application, Priority priority,
       FiCaSchedulerNode node, RMContainer rmContainer) {
     // Done with the reservation?
-    application.unreserve(node, priority);
-    node.unreserveResource(application);
-      
+    if (application.unreserve(node, priority)) {
+      node.unreserveResource(application);
+
       // Update reserved metrics
-    getMetrics().unreserveResource(
-        application.getUser(), rmContainer.getContainer().getResource());
+      getMetrics().unreserveResource(
+          application.getUser(), rmContainer.getContainer().getResource());
+      return true;
+    }
+    return false;
   }
 
-
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
@@ -1411,37 +1413,40 @@ public class LeafQueue implements CSQueu
       synchronized (this) {
 
         Container container = rmContainer.getContainer();
-        
+
+        boolean removed = false;
         // Inform the application & the node
         // Note: It's safe to assume that all state changes to RMContainer
         // happen under scheduler's lock... 
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
-          unreserve(application, rmContainer.getReservedPriority(), 
+          removed = unreserve(application, rmContainer.getReservedPriority(),
               node, rmContainer);
         } else {
-          application.containerCompleted(rmContainer, containerStatus, event);
+          removed =
+            application.containerCompleted(rmContainer, containerStatus, event);
           node.releaseContainer(container);
         }
 
-
         // Book-keeping
-        releaseResource(clusterResource, 
-            application, container.getResource());
-
-        LOG.info("completedContainer" +
-            " container=" + container +
-            " resource=" + container.getResource() +
-        		" queue=" + this + 
-            " usedCapacity=" + getUsedCapacity() +
-            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
+        if (removed) {
+          releaseResource(clusterResource,
+              application, container.getResource());
+          LOG.info("completedContainer" +
+              " container=" + container +
+              " resource=" + container.getResource() +
+              " queue=" + this +
+              " usedCapacity=" + getUsedCapacity() +
+              " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
+              " used=" + usedResources +
+              " cluster=" + clusterResource);
+          // Inform the parent queue
+          getParent().completedContainer(clusterResource, application,
+              node, rmContainer, null, event);
+        }
       }
 
-      // Inform the parent queue
-      getParent().completedContainer(clusterResource, application, 
-          node, rmContainer, null, event);
+
     }
   }
 
@@ -1588,5 +1593,19 @@ public class LeafQueue implements CSQueu
     getParent().recoverContainer(clusterResource, application, container);
 
   }
-  
+
+  // need to access the list of apps from the preemption monitor
+  public Set<FiCaSchedulerApp> getApplications() {
+    return Collections.unmodifiableSet(activeApplications);
+  }
+
+  // return a single Resource capturing the overal amount of pending resources
+  public Resource getTotalResourcePending() {
+    Resource ret = BuilderUtils.newResource(0, 0);
+    for (FiCaSchedulerApp f : activeApplications) {
+      Resources.addTo(ret, f.getTotalPendingRequests());
+    }
+    return ret;
+  }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1502084&r1=1502083&r2=1502084&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Jul 11 01:20:37 2013
@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +41,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -53,11 +55,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
@@ -85,17 +89,19 @@ public class FiCaSchedulerApp extends Sc
   private Resource resourceLimit = recordFactory
       .newRecordInstance(Resource.class);
 
-  private Map<ContainerId, RMContainer> liveContainers
-  = new HashMap<ContainerId, RMContainer>();
-  private List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
+  private Map<ContainerId, RMContainer> liveContainers =
+    new HashMap<ContainerId, RMContainer>();
+  private List<RMContainer> newlyAllocatedContainers =
+    new ArrayList<RMContainer>();
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
 
   private boolean isStopped = false;
 
-  
+  private final Set<ContainerId> containersToPreempt =
+    new HashSet<ContainerId>();
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -219,12 +225,17 @@ public class FiCaSchedulerApp extends Sc
       RMContainerEventType.LAUNCHED));
   }
 
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  synchronized public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
+
+    // Remove from the list of containers
+    if (null == liveContainers.remove(rmContainer.getContainerId())) {
+      return false;
+    }
+
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
-    
+
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -234,9 +245,8 @@ public class FiCaSchedulerApp extends Sc
         );
     LOG.info("Completed container: " + rmContainer.getContainerId() + 
         " in state: " + rmContainer.getState() + " event:" + event);
-    
-    // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+
+    containersToPreempt.remove(rmContainer.getContainerId());
 
     RMAuditLogger.logSuccess(getUser(), 
         AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
@@ -246,6 +256,8 @@ public class FiCaSchedulerApp extends Sc
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
+
+    return true;
   }
 
   synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
@@ -345,7 +357,8 @@ public class FiCaSchedulerApp extends Sc
   }
   
   /**
-   * Return the number of times the application has been given an opportunity
+   * @param priority Target priority
+   * @return the number of times the application has been given an opportunity
    * to schedule a task at the given priority since the last time it
    * successfully did so.
    */
@@ -419,33 +432,36 @@ public class FiCaSchedulerApp extends Sc
     return rmContainer;
   }
 
-  public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
-    if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(priority);
+  public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers =
+      this.reservedContainers.get(priority);
+
+    if (reservedContainers != null) {
+      RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+
+      // unreserve is now triggered in new scenarios (preemption)
+      // as a consequence reservedcontainer might be null, adding NP-checks
+      if (reservedContainer != null
+          && reservedContainer.getContainer() != null
+          && reservedContainer.getContainer().getResource() != null) {
+
+        if (reservedContainers.isEmpty()) {
+          this.reservedContainers.remove(priority);
+        }
+        // Reset the re-reservation count
+        resetReReservations(priority);
+
+        Resource resource = reservedContainer.getContainer().getResource();
+        Resources.subtractFrom(currentReservation, resource);
+
+        LOG.info("Application " + getApplicationId() + " unreserved "
+            + " on node " + node + ", currently has " + reservedContainers.size()
+            + " at priority " + priority + "; currentReservation "
+            + currentReservation);
+        return true;
+      }
     }
-    
-    // reservedContainer should not be null here
-    if (reservedContainer == null) {
-      String errorMesssage =
-          "Application " + getApplicationId() + " is trying to unreserve "
-              + " on node " + node + ", currently has "
-              + reservedContainers.size() + " at priority " + priority
-              + "; currentReservation " + currentReservation;
-      LOG.warn(errorMesssage);
-      throw new YarnRuntimeException(errorMesssage);
-    }
-    // Reset the re-reservation count
-    resetReReservations(priority);
-
-    Resource resource = reservedContainer.getContainer().getResource();
-    Resources.subtractFrom(currentReservation, resource);
-
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
+    return false;
   }
 
   /**
@@ -509,4 +525,55 @@ public class FiCaSchedulerApp extends Sc
   public Queue getQueue() {
     return queue;
   }
+
+  public Resource getTotalPendingRequests() {
+    Resource ret = Resource.newInstance(0, 0);
+    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+      // to avoid double counting we count only "ANY" resource requests
+      if (ResourceRequest.isAnyLocation(rr.getResourceName())){
+        Resources.addTo(ret,
+            Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+      }
+    }
+    return ret;
+  }
+
+  public synchronized void addPreemptContainer(ContainerId cont){
+    // ignore already completed containers
+    if (liveContainers.containsKey(cont)) {
+      containersToPreempt.add(cont);
+    }
+  }
+
+  /**
+   * This method produces an Allocation that includes the current view
+   * of the resources that will be allocated to and preempted from this
+   * application.
+   *
+   * @param rc
+   * @param clusterResource
+   * @param minimumAllocation
+   * @return an allocation
+   */
+  public synchronized Allocation getAllocation(ResourceCalculator rc,
+      Resource clusterResource, Resource minimumAllocation) {
+
+    Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+        new HashSet<ContainerId>(containersToPreempt));
+    containersToPreempt.clear();
+    Resource tot = Resource.newInstance(0, 0);
+    for(ContainerId c : currentContPreemption){
+      Resources.addTo(tot,
+          liveContainers.get(c).getContainer().getResource());
+    }
+    int numCont = (int) Math.ceil(
+        Resources.divide(rc, clusterResource, tot, minimumAllocation));
+    ResourceRequest rr = ResourceRequest.newInstance(
+        Priority.UNDEFINED, ResourceRequest.ANY,
+        minimumAllocation, numCont);
+    return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(),
+                          null, currentContPreemption,
+                          Collections.singletonList(rr));
+  }
+
 }



Mime
View raw message