Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE2FE18806 for ; Mon, 6 Jul 2015 20:57:38 +0000 (UTC) Received: (qmail 55866 invoked by uid 500); 6 Jul 2015 20:57:26 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 55169 invoked by uid 500); 6 Jul 2015 20:57:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 53188 invoked by uid 99); 6 Jul 2015 20:57:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 20:57:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5912DFE19; Mon, 6 Jul 2015 20:57:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 06 Jul 2015 20:57:58 -0000 Message-Id: <9311d1db5ee44a589ccce06363bfd89e@git.apache.org> In-Reply-To: <6162da200e644fdeabf9f916da3b92b4@git.apache.org> References: <6162da200e644fdeabf9f916da3b92b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/48] hadoop git commit: YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda) YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5192ca9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5192ca9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5192ca9 Branch: refs/heads/YARN-2928 Commit: d5192ca993d8c76fe0b51c3a91967fb0c2bbf0a1 Parents: 2820f1f Author: Wangda Tan Authored: Wed Jul 1 17:32:22 2015 -0700 Committer: Zhijie Shen Committed: Mon Jul 6 11:32:00 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/ResourceManager.java | 34 --------- .../monitor/SchedulingEditPolicy.java | 6 +- .../monitor/SchedulingMonitor.java | 3 +- .../ProportionalCapacityPreemptionPolicy.java | 42 ++++++----- .../scheduler/ContainerPreemptEvent.java | 8 +- .../scheduler/ContainerPreemptEventType.java | 26 ------- .../scheduler/capacity/CapacityScheduler.java | 24 ++++++ .../scheduler/event/SchedulerEventType.java | 7 +- .../resourcemanager/TestRMDispatcher.java | 79 ++++++++++++++++++++ ...estProportionalCapacityPreemptionPolicy.java | 19 +++-- ...pacityPreemptionPolicyForNodePartitions.java | 12 ++- 12 files changed, 161 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c4c7fa9..3e42e61 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -704,6 +704,9 @@ Release 2.7.2 - UNRELEASED YARN-3793. Several NPEs when deleting local files on NM recovery (Varun Saxena via jlowe) + YARN-3508. Prevent processing preemption events on the main RM dispatcher. + (Varun Saxena via wangda) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3302320..b235058 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -624,9 +623,6 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, SchedulingEditPolicy.class); if (policies.size() > 0) { - rmDispatcher.register(ContainerPreemptEventType.class, - new RMContainerPreemptEventDispatcher( - (PreemptableResourceScheduler) scheduler)); for (SchedulingEditPolicy policy : policies) { LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); // periodically check whether we need to take action to guarantee @@ -797,36 +793,6 @@ public class ResourceManager extends CompositeService implements Recoverable { } @Private - public static final class - RMContainerPreemptEventDispatcher - implements EventHandler { - - private final PreemptableResourceScheduler scheduler; - - public RMContainerPreemptEventDispatcher( - PreemptableResourceScheduler scheduler) { - this.scheduler = scheduler; - } - - @Override - public void handle(ContainerPreemptEvent event) { - ApplicationAttemptId aid = event.getAppId(); - RMContainer container = event.getContainer(); - switch (event.getType()) { - case DROP_RESERVATION: - scheduler.dropContainerReservation(container); - break; - case PREEMPT_CONTAINER: - scheduler.preemptContainer(aid, container); - break; - case KILL_CONTAINER: - scheduler.killContainer(container); - break; - } - } - } - - @Private public static final class ApplicationAttemptEventDispatcher implements EventHandler { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java index 1ebc19f..0d587d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java @@ -18,14 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; public interface SchedulingEditPolicy { - public void init(Configuration config, - EventHandler dispatcher, + public void init(Configuration config, RMContext context, PreemptableResourceScheduler scheduler); /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 1682f7d..d4c129b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService { return scheduleEditPolicy; } - @SuppressWarnings("unchecked") public void serviceInit(Configuration conf) throws Exception { - scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(), + scheduleEditPolicy.init(conf, rmContext, (PreemptableResourceScheduler) rmContext.getScheduler()); this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1f47b5f..5a20a6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -38,13 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - // the dispatcher to send preempt and kill events - public EventHandler dispatcher; + private RMContext rmContext; private final Clock clock; private double maxIgnoredOverCapacity; @@ -141,20 +140,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } public ProportionalCapacityPreemptionPolicy(Configuration config, - EventHandler dispatcher, - CapacityScheduler scheduler) { - this(config, dispatcher, scheduler, new SystemClock()); + RMContext context, CapacityScheduler scheduler) { + this(config, context, scheduler, new SystemClock()); } public ProportionalCapacityPreemptionPolicy(Configuration config, - EventHandler dispatcher, - CapacityScheduler scheduler, Clock clock) { - init(config, dispatcher, scheduler); + RMContext context, CapacityScheduler scheduler, Clock clock) { + init(config, context, scheduler); this.clock = clock; } - public void init(Configuration config, - EventHandler disp, + public void init(Configuration config, RMContext context, PreemptableResourceScheduler sched) { LOG.info("Preemption monitor:" + this.getClass().getCanonicalName()); assert null == scheduler : "Unexpected duplicate call to init"; @@ -163,7 +159,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic sched.getClass().getCanonicalName() + " not instance of " + CapacityScheduler.class.getCanonicalName()); } - dispatcher = disp; + rmContext = context; scheduler = (CapacityScheduler) sched; maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); naturalTerminationFactor = @@ -196,6 +192,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param root the root of the CapacityScheduler queue hierarchy * @param clusterResources the total amount of resources in the cluster */ + @SuppressWarnings("unchecked") private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { // All partitions to look at @@ -248,8 +245,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // preempt (or kill) the selected containers for (Map.Entry> e : toPreempt.entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + e.getKey() + LOG.debug("Send to scheduler: in app=" + appAttemptId + " #containers-to-be-preempted=" + e.getValue().size()); } for (RMContainer container : e.getValue()) { @@ -257,13 +255,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic if (preempted.get(container) != null && preempted.get(container) + maxWaitTime < clock.getTime()) { // kill it - dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, - ContainerPreemptEventType.KILL_CONTAINER)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.KILL_CONTAINER)); preempted.remove(container); } else { //otherwise just send preemption events - dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, - ContainerPreemptEventType.PREEMPT_CONTAINER)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.PREEMPT_CONTAINER)); if (preempted.get(container) == null) { preempted.put(container, clock.getTime()); } @@ -735,6 +735,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * Given a target preemption for a specific application, select containers * to preempt (after unreserving all reservation for that app). */ + @SuppressWarnings("unchecked") private void preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Map resToObtainByPartition, List skippedAMContainerlist, Resource skippedAMSize, @@ -758,8 +759,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic clusterResource, preemptMap); if (!observeOnly) { - dispatcher.handle(new ContainerPreemptEvent(appId, c, - ContainerPreemptEventType.DROP_RESERVATION)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent( + appId, c, SchedulerEventType.DROP_RESERVATION)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java index 8eba48d..7ab2758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java @@ -19,20 +19,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; /** * Simple event class used to communicate containers unreservations, preemption, killing */ -public class ContainerPreemptEvent - extends AbstractEvent { +public class ContainerPreemptEvent extends SchedulerEvent { private final ApplicationAttemptId aid; private final RMContainer container; public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, - ContainerPreemptEventType type) { + SchedulerEventType type) { super(type); this.aid = aid; this.container = container; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java deleted file mode 100644 index a70a836..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; - -public enum ContainerPreemptEventType { - - DROP_RESERVATION, - PREEMPT_CONTAINER, - KILL_CONTAINER - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f1d0f9c..141aa7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -1346,6 +1347,29 @@ public class CapacityScheduler extends RMContainerEventType.EXPIRE); } break; + case DROP_RESERVATION: + { + ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; + RMContainer container = dropReservationEvent.getContainer(); + dropContainerReservation(container); + } + break; + case PREEMPT_CONTAINER: + { + ContainerPreemptEvent preemptContainerEvent = + (ContainerPreemptEvent)event; + ApplicationAttemptId aid = preemptContainerEvent.getAppId(); + RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); + preemptContainer(aid, containerToBePreempted); + } + break; + case KILL_CONTAINER: + { + ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; + RMContainer containerToBeKilled = killContainerEvent.getContainer(); + killContainer(containerToBeKilled); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 13aecb3..9de935b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -36,5 +36,10 @@ public enum SchedulerEventType { APP_ATTEMPT_REMOVED, // Source: ContainerAllocationExpirer - CONTAINER_EXPIRED + CONTAINER_EXPIRED, + + // Source: SchedulingEditPolicy + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java new file mode 100644 index 0000000..db7c96a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.junit.Assert; +import org.junit.Test; + +public class TestRMDispatcher { + + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testSchedulerEventDispatcherForPreemptionEvents() { + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + CapacityScheduler sched = spy(new CapacityScheduler()); + YarnConfiguration conf = new YarnConfiguration(); + SchedulerEventDispatcher schedulerDispatcher = + new SchedulerEventDispatcher(sched); + rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); + rmDispatcher.init(conf); + rmDispatcher.start(); + schedulerDispatcher.init(conf); + schedulerDispatcher.start(); + try { + ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); + RMContainer container = mock(RMContainer.class); + ContainerPreemptEvent event1 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.DROP_RESERVATION); + rmDispatcher.getEventHandler().handle(event1); + ContainerPreemptEvent event2 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.KILL_CONTAINER); + rmDispatcher.getEventHandler().handle(event2); + ContainerPreemptEvent event3 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); + rmDispatcher.getEventHandler().handle(event3); + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + verify(sched, times(3)).handle(any(SchedulerEvent.class)); + verify(sched).dropContainerReservation(container); + verify(sched).preemptContainer(appAttemptId, container); + verify(sched).killContainer(container); + } catch (InterruptedException e) { + Assert.fail(); + } finally { + schedulerDispatcher.stop(); + rmDispatcher.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 6c0ed6c..2c0c6d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -104,7 +106,7 @@ public class TestProportionalCapacityPreemptionPolicy { RMContext rmContext = null; RMNodeLabelsManager lm = null; CapacitySchedulerConfiguration schedConf = null; - EventHandler mDisp = null; + EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); Resource clusterResources = null; final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( @@ -164,6 +166,9 @@ public class TestProportionalCapacityPreemptionPolicy { when(mCS.getRMContext()).thenReturn(rmContext); when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); + Dispatcher disp = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(disp); + when(disp.getEventHandler()).thenReturn(mDisp); rand = new Random(); long seed = rand.nextLong(); System.out.println(name.getMethodName() + " SEED: " + seed); @@ -866,12 +871,12 @@ public class TestProportionalCapacityPreemptionPolicy { static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; - private final ContainerPreemptEventType type; + private final SchedulerEventType type; IsPreemptionRequestFor(ApplicationAttemptId appAttId) { this(appAttId, PREEMPT_CONTAINER); } IsPreemptionRequestFor(ApplicationAttemptId appAttId, - ContainerPreemptEventType type) { + SchedulerEventType type) { this.appAttId = appAttId; this.type = type; } @@ -888,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicy { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5192ca9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index e13320c..114769c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -50,13 +50,13 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -92,7 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { private Configuration conf = null; private CapacitySchedulerConfiguration csConf = null; private CapacityScheduler cs = null; - private EventHandler mDisp = null; + private EventHandler mDisp = null; private ProportionalCapacityPreemptionPolicy policy = null; private Resource clusterResource = null; @@ -125,11 +126,14 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { rmContext = mock(RMContext.class); when(rmContext.getNodeLabelManager()).thenReturn(nlm); + Dispatcher disp = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(disp); + when(disp.getEventHandler()).thenReturn(mDisp); csConf = new CapacitySchedulerConfiguration(); when(cs.getConfiguration()).thenReturn(csConf); when(cs.getRMContext()).thenReturn(rmContext); - policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock); + policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock); partitionToResource = new HashMap<>(); nodeIdToSchedulerNodes = new HashMap<>(); nameToCSQueues = new HashMap<>(); @@ -828,7 +832,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { when(cs.getClusterResource()).thenReturn(clusterResource); mockApplications(appsConfig); - policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock); + policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock); } private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,