Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1305518A3B for ; Thu, 2 Jul 2015 18:08:32 +0000 (UTC) Received: (qmail 41218 invoked by uid 500); 2 Jul 2015 18:08:31 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 41148 invoked by uid 500); 2 Jul 2015 18:08:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 41139 invoked by uid 99); 2 Jul 2015 18:08:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 18:08:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94DD4E364C; Thu, 2 Jul 2015 18:08:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Message-Id: <9d2ab383b21c410e90f490de576ab7b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda) Date: Thu, 2 Jul 2015 18:08:31 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2.7 055d9292a -> a6f6ba95e YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6f6ba95 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6f6ba95 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6f6ba95 Branch: refs/heads/branch-2.7 Commit: a6f6ba95ef822842b94efe025dd478c5e1a469b8 Parents: 055d929 Author: Wangda Tan Authored: Thu Jul 2 11:08:21 2015 -0700 Committer: Wangda Tan Committed: Thu Jul 2 11:08:21 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/ResourceManager.java | 34 --------- .../monitor/SchedulingEditPolicy.java | 6 +- .../monitor/SchedulingMonitor.java | 3 +- .../ProportionalCapacityPreemptionPolicy.java | 40 +++++----- .../scheduler/ContainerPreemptEvent.java | 8 +- .../scheduler/ContainerPreemptEventType.java | 26 ------- .../scheduler/capacity/CapacityScheduler.java | 24 ++++++ .../scheduler/event/SchedulerEventType.java | 7 +- .../resourcemanager/TestRMDispatcher.java | 79 ++++++++++++++++++++ ...estProportionalCapacityPreemptionPolicy.java | 21 +++--- 11 files changed, 151 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a371683..d5ecf96 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -15,6 +15,9 @@ Release 2.7.2 - UNRELEASED YARN-3793. Several NPEs when deleting local files on NM recovery (Varun Saxena via jlowe) + YARN-3508. Prevent processing preemption events on the main RM dispatcher. + (Varun Saxena via wangda) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8bd8e21..ed4cdc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -610,9 +609,6 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, SchedulingEditPolicy.class); if (policies.size() > 0) { - rmDispatcher.register(ContainerPreemptEventType.class, - new RMContainerPreemptEventDispatcher( - (PreemptableResourceScheduler) scheduler)); for (SchedulingEditPolicy policy : policies) { LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); // periodically check whether we need to take action to guarantee @@ -783,36 +779,6 @@ public class ResourceManager extends CompositeService implements Recoverable { } @Private - public static final class - RMContainerPreemptEventDispatcher - implements EventHandler { - - private final PreemptableResourceScheduler scheduler; - - public RMContainerPreemptEventDispatcher( - PreemptableResourceScheduler scheduler) { - this.scheduler = scheduler; - } - - @Override - public void handle(ContainerPreemptEvent event) { - ApplicationAttemptId aid = event.getAppId(); - RMContainer container = event.getContainer(); - switch (event.getType()) { - case DROP_RESERVATION: - scheduler.dropContainerReservation(container); - break; - case PREEMPT_CONTAINER: - scheduler.preemptContainer(aid, container); - break; - case KILL_CONTAINER: - scheduler.killContainer(container); - break; - } - } - } - - @Private public static final class ApplicationAttemptEventDispatcher implements EventHandler { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java index 1ebc19f..0d587d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java @@ -18,14 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; public interface SchedulingEditPolicy { - public void init(Configuration config, - EventHandler dispatcher, + public void init(Configuration config, RMContext context, PreemptableResourceScheduler scheduler); /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 1682f7d..d4c129b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService { return scheduleEditPolicy; } - @SuppressWarnings("unchecked") public void serviceInit(Configuration conf) throws Exception { - scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(), + scheduleEditPolicy.init(conf, rmContext, (PreemptableResourceScheduler) rmContext.getScheduler()); this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 87a2a00..abcb1a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -30,7 +30,6 @@ import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; -import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,19 +37,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -116,8 +114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - // the dispatcher to send preempt and kill events - public EventHandler dispatcher; + private RMContext rmContext; private final Clock clock; private double maxIgnoredOverCapacity; @@ -137,20 +134,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic } public ProportionalCapacityPreemptionPolicy(Configuration config, - EventHandler dispatcher, - CapacityScheduler scheduler) { - this(config, dispatcher, scheduler, new SystemClock()); + RMContext context, CapacityScheduler scheduler) { + this(config, context, scheduler, new SystemClock()); } public ProportionalCapacityPreemptionPolicy(Configuration config, - EventHandler dispatcher, - CapacityScheduler scheduler, Clock clock) { - init(config, dispatcher, scheduler); + RMContext context, CapacityScheduler scheduler, Clock clock) { + init(config, context, scheduler); this.clock = clock; } - public void init(Configuration config, - EventHandler disp, + public void init(Configuration config, RMContext context, PreemptableResourceScheduler sched) { LOG.info("Preemption monitor:" + this.getClass().getCanonicalName()); assert null == scheduler : "Unexpected duplicate call to init"; @@ -159,7 +153,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic sched.getClass().getCanonicalName() + " not instance of " + CapacityScheduler.class.getCanonicalName()); } - dispatcher = disp; + rmContext = context; scheduler = (CapacityScheduler) sched; maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); naturalTerminationFactor = @@ -218,6 +212,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param root the root of the CapacityScheduler queue hierarchy * @param clusterResources the total amount of resources in the cluster */ + @SuppressWarnings("unchecked") private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) { @@ -252,18 +247,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // preempt (or kill) the selected containers for (Map.Entry> e : toPreempt.entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); for (RMContainer container : e.getValue()) { // if we tried to preempt this for more than maxWaitTime if (preempted.get(container) != null && preempted.get(container) + maxWaitTime < clock.getTime()) { // kill it - dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, - ContainerPreemptEventType.KILL_CONTAINER)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.KILL_CONTAINER)); preempted.remove(container); } else { //otherwise just send preemption events - dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, - ContainerPreemptEventType.PREEMPT_CONTAINER)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.PREEMPT_CONTAINER)); if (preempted.get(container) == null) { preempted.put(container, clock.getTime()); } @@ -634,6 +632,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic * @param rsrcPreempt * @return Set Set of RMContainers */ + @SuppressWarnings("unchecked") private Set preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt, List skippedAMContainerlist, Resource skippedAMSize) { @@ -649,8 +648,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic return ret; } if (!observeOnly) { - dispatcher.handle(new ContainerPreemptEvent(appId, c, - ContainerPreemptEventType.DROP_RESERVATION)); + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent( + appId, c, SchedulerEventType.DROP_RESERVATION)); } Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java index 8eba48d..7ab2758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java @@ -19,20 +19,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; /** * Simple event class used to communicate containers unreservations, preemption, killing */ -public class ContainerPreemptEvent - extends AbstractEvent { +public class ContainerPreemptEvent extends SchedulerEvent { private final ApplicationAttemptId aid; private final RMContainer container; public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, - ContainerPreemptEventType type) { + SchedulerEventType type) { super(type); this.aid = aid; this.container = container; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java deleted file mode 100644 index a70a836..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; - -public enum ContainerPreemptEventType { - - DROP_RESERVATION, - PREEMPT_CONTAINER, - KILL_CONTAINER - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 70fe57e..e43003b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -1212,6 +1213,29 @@ public class CapacityScheduler extends RMContainerEventType.EXPIRE); } break; + case DROP_RESERVATION: + { + ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; + RMContainer container = dropReservationEvent.getContainer(); + dropContainerReservation(container); + } + break; + case PREEMPT_CONTAINER: + { + ContainerPreemptEvent preemptContainerEvent = + (ContainerPreemptEvent)event; + ApplicationAttemptId aid = preemptContainerEvent.getAppId(); + RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); + preemptContainer(aid, containerToBePreempted); + } + break; + case KILL_CONTAINER: + { + ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; + RMContainer containerToBeKilled = killContainerEvent.getContainer(); + killContainer(containerToBeKilled); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 13aecb3..9de935b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -36,5 +36,10 @@ public enum SchedulerEventType { APP_ATTEMPT_REMOVED, // Source: ContainerAllocationExpirer - CONTAINER_EXPIRED + CONTAINER_EXPIRED, + + // Source: SchedulingEditPolicy + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java new file mode 100644 index 0000000..db7c96a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.junit.Assert; +import org.junit.Test; + +public class TestRMDispatcher { + + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testSchedulerEventDispatcherForPreemptionEvents() { + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + CapacityScheduler sched = spy(new CapacityScheduler()); + YarnConfiguration conf = new YarnConfiguration(); + SchedulerEventDispatcher schedulerDispatcher = + new SchedulerEventDispatcher(sched); + rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); + rmDispatcher.init(conf); + rmDispatcher.start(); + schedulerDispatcher.init(conf); + schedulerDispatcher.start(); + try { + ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); + RMContainer container = mock(RMContainer.class); + ContainerPreemptEvent event1 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.DROP_RESERVATION); + rmDispatcher.getEventHandler().handle(event1); + ContainerPreemptEvent event2 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.KILL_CONTAINER); + rmDispatcher.getEventHandler().handle(event2); + ContainerPreemptEvent event3 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); + rmDispatcher.getEventHandler().handle(event3); + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + verify(sched, times(3)).handle(any(SchedulerEvent.class)); + verify(sched).dropContainerReservation(container); + verify(sched).preemptContainer(appAttemptId, container); + verify(sched).killContainer(container); + } catch (InterruptedException e) { + Assert.fail(); + } finally { + schedulerDispatcher.stop(); + rmDispatcher.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6f6ba95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8f5237e..8e9545d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -53,7 +53,6 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; -import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -88,7 +89,6 @@ import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import org.mortbay.log.Log; public class TestProportionalCapacityPreemptionPolicy { @@ -105,7 +105,7 @@ public class TestProportionalCapacityPreemptionPolicy { RMContext rmContext = null; RMNodeLabelsManager lm = null; CapacitySchedulerConfiguration schedConf = null; - EventHandler mDisp = null; + EventHandler mDisp = null; ResourceCalculator rc = new DefaultResourceCalculator(); Resource clusterResources = null; final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( @@ -165,6 +165,9 @@ public class TestProportionalCapacityPreemptionPolicy { when(mCS.getRMContext()).thenReturn(rmContext); when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); + Dispatcher disp = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(disp); + when(disp.getEventHandler()).thenReturn(mDisp); rand = new Random(); long seed = rand.nextLong(); System.out.println(name.getMethodName() + " SEED: " + seed); @@ -911,12 +914,12 @@ public class TestProportionalCapacityPreemptionPolicy { static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; - private final ContainerPreemptEventType type; + private final SchedulerEventType type; IsPreemptionRequestFor(ApplicationAttemptId appAttId) { this(appAttId, PREEMPT_CONTAINER); } IsPreemptionRequestFor(ApplicationAttemptId appAttId, - ContainerPreemptEventType type) { + SchedulerEventType type) { this.appAttId = appAttId; this.type = type; } @@ -933,7 +936,7 @@ public class TestProportionalCapacityPreemptionPolicy { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = - new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot);