Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DA81C200C2D for ; Fri, 17 Feb 2017 15:49:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D90B2160B3F; Fri, 17 Feb 2017 14:49:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8A5B160B73 for ; Fri, 17 Feb 2017 15:49:48 +0100 (CET) Received: (qmail 43754 invoked by uid 500); 17 Feb 2017 14:49:42 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 42782 invoked by uid 99); 17 Feb 2017 14:49:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2017 14:49:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 823A1E110F; Fri, 17 Feb 2017 14:49:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Fri, 17 Feb 2017 14:49:57 -0000 Message-Id: In-Reply-To: <05a86cae06e545b3ac1a82a6f00b2924@git.apache.org> References: <05a86cae06e545b3ac1a82a6f00b2924@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/50] [abbrv] hadoop git commit: YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha) archived-at: Fri, 17 Feb 2017 14:49:51 -0000 YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c25dbcd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c25dbcd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c25dbcd Branch: refs/heads/HDFS-10285 Commit: 6c25dbcdc0517a825b92fb16444aa1d3761e160c Parents: a136936 Author: Karthik Kambatla Authored: Wed Feb 15 23:16:01 2017 -0800 Committer: Karthik Kambatla Committed: Wed Feb 15 23:16:12 2017 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/util/resource/Resources.java | 18 +++ .../scheduler/AbstractYarnScheduler.java | 4 + .../scheduler/fair/FSAppAttempt.java | 110 ++++++++++++-- .../scheduler/fair/FSLeafQueue.java | 111 +++++++++----- .../scheduler/fair/FSPreemptionThread.java | 132 ++++++++--------- .../scheduler/fair/FairScheduler.java | 4 + .../fair/FairSchedulerConfiguration.java | 23 ++- .../fair/VisitedResourceRequestTracker.java | 146 +++++++++++++++++++ .../fair/FairSchedulerWithMockPreemption.java | 5 +- .../scheduler/fair/TestFSAppStarvation.java | 20 ++- .../fair/TestFairSchedulerPreemption.java | 45 +++--- .../fair/TestVisitedResourceRequestTracker.java | 112 ++++++++++++++ 12 files changed, 585 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 044a232..57b3a46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -182,6 +182,24 @@ public class Resources { return subtractFrom(clone(lhs), rhs); } + /** + * Subtract rhs from lhs and reset any negative + * values to zero. + * @param lhs {@link Resource} to subtract from + * @param rhs {@link Resource} to subtract + * @return the value of lhs after subtraction + */ + public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { + subtractFrom(lhs, rhs); + if (lhs.getMemorySize() < 0) { + lhs.setMemorySize(0); + } + if (lhs.getVirtualCores() < 0) { + lhs.setVirtualCores(0); + } + return lhs; + } + public static Resource negate(Resource resource) { return subtract(NONE, resource); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64427b7..ce6d2a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -127,6 +127,7 @@ public abstract class AbstractYarnScheduler */ protected ConcurrentMap> applications; protected int nmExpireInterval; + protected long nmHeartbeatInterval; protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -163,6 +164,9 @@ public abstract class AbstractYarnScheduler nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + nmHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6dfcc84..563b892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -87,6 +87,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private final Set containersToPreempt = new HashSet<>(); private Resource fairshareStarvation = Resources.none(); private long lastTimeAtFairShare; + private long nextStarvationCheck; // minShareStarvation attributed to this application by the leaf queue private Resource minshareStarvation = Resources.none(); @@ -211,15 +212,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this)); } for (FSSchedulerNode node: blacklistNodeIds) { - Resources.subtractFrom(availableResources, + Resources.subtractFromNonNegative(availableResources, node.getUnallocatedResource()); } - if (availableResources.getMemorySize() < 0) { - availableResources.setMemorySize(0); - } - if (availableResources.getVirtualCores() < 0) { - availableResources.setVirtualCores(0); - } } /** @@ -530,6 +525,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** + * Get last computed fairshare starvation. + * + * @return last computed fairshare starvation + */ + Resource getFairshareStarvation() { + return fairshareStarvation; + } + + /** * Set the minshare attributed to this application. To be called only from * {@link FSLeafQueue#updateStarvedApps}. * @@ -1077,17 +1081,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** - * Helper method that computes the extent of fairshare fairshareStarvation. + * Helper method that computes the extent of fairshare starvation. + * @return freshly computed fairshare starvation */ Resource fairShareStarvation() { Resource threshold = Resources.multiply( getFairShare(), fsQueue.getFairSharePreemptionThreshold()); - Resource starvation = Resources.subtractFrom(threshold, getResourceUsage()); + Resource starvation = Resources.componentwiseMin(threshold, demand); + Resources.subtractFromNonNegative(starvation, getResourceUsage()); long now = scheduler.getClock().getTime(); - boolean starved = Resources.greaterThan( - fsQueue.getPolicy().getResourceCalculator(), - scheduler.getClusterResource(), starvation, Resources.none()); + boolean starved = !Resources.isNone(starvation); if (!starved) { lastTimeAtFairShare = now; @@ -1111,6 +1115,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return !Resources.isNone(fairshareStarvation); } + /** + * Fetch a list of RRs corresponding to the extent the app is starved + * (fairshare and minshare). This method considers the number of containers + * in a RR and also only one locality-level (the first encountered + * resourceName). + * + * @return list of {@link ResourceRequest}s corresponding to the amount of + * starvation. + */ + List getStarvedResourceRequests() { + // List of RRs we build in this method to return + List ret = new ArrayList<>(); + + // Track visited RRs to avoid the same RR at multiple locality levels + VisitedResourceRequestTracker visitedRRs = + new VisitedResourceRequestTracker(scheduler.getNodeTracker()); + + // Start with current starvation and track the pending amount + Resource pending = getStarvation(); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { + if (Resources.isNone(pending)) { + // Found enough RRs to match the starvation + break; + } + + // See if we have already seen this RR + if (!visitedRRs.visit(rr)) { + continue; + } + + // A RR can have multiple containers of a capability. We need to + // compute the number of containers that fit in "pending". + int numContainersThatFit = (int) Math.floor( + Resources.ratio(scheduler.getResourceCalculator(), + pending, rr.getCapability())); + if (numContainersThatFit == 0) { + // This RR's capability is too large to fit in pending + continue; + } + + // If the RR is only partially being satisfied, include only the + // partial number of containers. + if (numContainersThatFit < rr.getNumContainers()) { + rr = ResourceRequest.newInstance(rr.getPriority(), + rr.getResourceName(), rr.getCapability(), numContainersThatFit); + } + + // Add the RR to return list and adjust "pending" accordingly + ret.add(rr); + Resources.subtractFromNonNegative(pending, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + + return ret; + } + + /** + * Notify this app that preemption has been triggered to make room for + * outstanding demand. The app should not be considered starved until after + * the specified delay. + * + * @param delayBeforeNextStarvationCheck duration to wait + */ + void preemptionTriggered(long delayBeforeNextStarvationCheck) { + nextStarvationCheck = + scheduler.getClock().getTime() + delayBeforeNextStarvationCheck; + } + + /** + * Whether this app's starvation should be considered. + */ + boolean shouldCheckForStarvation() { + return scheduler.getClock().getTime() >= nextStarvationCheck; + } + /* Schedulable methods implementation */ @Override @@ -1123,6 +1202,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return demand; } + /** + * Get the current app's unsatisfied demand. + */ + Resource getPendingDemand() { + return Resources.subtract(demand, getResourceUsage()); + } + @Override public long getStartTime() { return startTime; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 16070e0..c4b2de6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -220,54 +220,53 @@ public class FSLeafQueue extends FSQueue { } /** - * Helper method to identify starved applications. This needs to be called - * ONLY from {@link #updateInternal}, after the application shares - * are updated. - * - * A queue can be starving due to fairshare or minshare. - * - * Minshare is defined only on the queue and not the applications. - * Fairshare is defined for both the queue and the applications. - * - * If this queue is starved due to minshare, we need to identify the most - * deserving apps if they themselves are not starved due to fairshare. + * Compute the extent of fairshare starvation for a set of apps. * - * If this queue is starving due to fairshare, there must be at least - * one application that is starved. And, even if the queue is not - * starved due to fairshare, there might still be starved applications. + * @param appsWithDemand apps to compute fairshare starvation for + * @return aggregate fairshare starvation for all apps */ - private void updateStarvedApps() { - // First identify starved applications and track total amount of - // starvation (in resources) + private Resource updateStarvedAppsFairshare( + TreeSet appsWithDemand) { Resource fairShareStarvation = Resources.clone(none()); - // Fetch apps with unmet demand sorted by fairshare starvation - TreeSet appsWithDemand = fetchAppsWithDemand(); for (FSAppAttempt app : appsWithDemand) { Resource appStarvation = app.fairShareStarvation(); - if (!Resources.equals(Resources.none(), appStarvation)) { + if (!Resources.isNone(appStarvation)) { context.getStarvedApps().addStarvedApp(app); Resources.addTo(fairShareStarvation, appStarvation); } else { break; } } + return fairShareStarvation; + } - // Compute extent of minshare starvation - Resource minShareStarvation = minShareStarvation(); - - // Compute minshare starvation that is not subsumed by fairshare starvation - Resources.subtractFrom(minShareStarvation, fairShareStarvation); + /** + * Distribute minshare starvation to a set of apps + * @param appsWithDemand set of apps + * @param minShareStarvation minshare starvation to distribute + */ + private void updateStarvedAppsMinshare( + final TreeSet appsWithDemand, + final Resource minShareStarvation) { + Resource pending = Resources.clone(minShareStarvation); // Keep adding apps to the starved list until the unmet demand goes over // the remaining minshare for (FSAppAttempt app : appsWithDemand) { - if (Resources.greaterThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), minShareStarvation, none())) { - Resource appPendingDemand = - Resources.subtract(app.getDemand(), app.getResourceUsage()); - Resources.subtractFrom(minShareStarvation, appPendingDemand); - app.setMinshareStarvation(appPendingDemand); + if (!Resources.isNone(pending)) { + Resource appMinShare = app.getPendingDemand(); + Resources.subtractFromNonNegative( + appMinShare, app.getFairshareStarvation()); + + if (Resources.greaterThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), appMinShare, pending)) { + Resources.subtractFromNonNegative(appMinShare, pending); + pending = none(); + } else { + Resources.subtractFromNonNegative(pending, appMinShare); + } + app.setMinshareStarvation(appMinShare); context.getStarvedApps().addStarvedApp(app); } else { // Reset minshare starvation in case we had set it in a previous @@ -277,6 +276,40 @@ public class FSLeafQueue extends FSQueue { } } + /** + * Helper method to identify starved applications. This needs to be called + * ONLY from {@link #updateInternal}, after the application shares + * are updated. + * + * A queue can be starving due to fairshare or minshare. + * + * Minshare is defined only on the queue and not the applications. + * Fairshare is defined for both the queue and the applications. + * + * If this queue is starved due to minshare, we need to identify the most + * deserving apps if they themselves are not starved due to fairshare. + * + * If this queue is starving due to fairshare, there must be at least + * one application that is starved. And, even if the queue is not + * starved due to fairshare, there might still be starved applications. + */ + private void updateStarvedApps() { + // Fetch apps with pending demand + TreeSet appsWithDemand = fetchAppsWithDemand(false); + + // Process apps with fairshare starvation + Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand); + + // Compute extent of minshare starvation + Resource minShareStarvation = minShareStarvation(); + + // Compute minshare starvation that is not subsumed by fairshare starvation + Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation); + + // Assign this minshare to apps with pending demand over fairshare + updateStarvedAppsMinshare(appsWithDemand, minShareStarvation); + } + @Override public Resource getDemand() { return demand; @@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue { return assigned; } - for (FSAppAttempt sched : fetchAppsWithDemand()) { + for (FSAppAttempt sched : fetchAppsWithDemand(true)) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } @@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue { return assigned; } - private TreeSet fetchAppsWithDemand() { + /** + * Fetch the subset of apps that have unmet demand. When used for + * preemption-related code (as opposed to allocation), omits apps that + * should not be checked for starvation. + * + * @param assignment whether the apps are for allocation containers, as + * opposed to preemption calculations + * @return Set of apps with unmet demand + */ + private TreeSet fetchAppsWithDemand(boolean assignment) { TreeSet pendingForResourceApps = new TreeSet<>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(none())) { + if (!Resources.isNone(pending) && + (assignment || app.shouldCheckForStarvation())) { pendingForResourceApps.add(app); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index f166878..af73c10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; @@ -43,20 +41,26 @@ class FSPreemptionThread extends Thread { protected final FSContext context; private final FairScheduler scheduler; private final long warnTimeBeforeKill; + private final long delayBeforeNextStarvationCheck; private final Timer preemptionTimer; FSPreemptionThread(FairScheduler scheduler) { + setDaemon(true); + setName("FSPreemptionThread"); this.scheduler = scheduler; this.context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); context.setPreemptionEnabled(); context.setPreemptionUtilizationThreshold( fsConf.getPreemptionUtilizationThreshold()); - warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); preemptionTimer = new Timer("Preemption Timer", true); - setDaemon(true); - setName("FSPreemptionThread"); + warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); + long allocDelay = (fsConf.isContinuousSchedulingEnabled() + ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs + : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats + delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + + fsConf.getWaitTimeBeforeNextStarvationCheck(); } public void run() { @@ -64,13 +68,8 @@ class FSPreemptionThread extends Thread { FSAppAttempt starvedApp; try{ starvedApp = context.getStarvedApps().take(); - if (!Resources.isNone(starvedApp.getStarvation())) { - PreemptableContainers containers = - identifyContainersToPreempt(starvedApp); - if (containers != null) { - preemptContainers(containers.containers); - } - } + preemptContainers(identifyContainersToPreempt(starvedApp)); + starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); return; @@ -79,58 +78,57 @@ class FSPreemptionThread extends Thread { } /** - * Given an app, identify containers to preempt to satisfy the app's next - * resource request. + * Given an app, identify containers to preempt to satisfy the app's + * starvation. + * + * Mechanics: + * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of + * starvation. + * 2. For each {@link ResourceRequest}, iterate through matching + * nodes and identify containers to preempt all on one node, also + * optimizing for least number of AM container preemptions. * * @param starvedApp starved application for which we are identifying * preemption targets - * @return list of containers to preempt to satisfy starvedApp, null if the - * app cannot be satisfied by preempting any running containers + * @return list of containers to preempt to satisfy starvedApp */ - private PreemptableContainers identifyContainersToPreempt( + private List identifyContainersToPreempt( FSAppAttempt starvedApp) { - PreemptableContainers bestContainers = null; - - // Find the nodes that match the next resource request - SchedulingPlacementSet nextPs = - starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet(); - PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY); - // TODO (KK): Should we check other resource requests if we can't match - // the first one? - - Resource requestCapability = firstPendingAsk.getPerAllocationResource(); - - List potentialNodes = - scheduler.getNodeTracker().getNodesByResourceName( - nextPs.getAcceptedResouceNames().next().toString()); + List containersToPreempt = new ArrayList<>(); + + // Iterate through enough RRs to address app's starvation + for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + PreemptableContainers bestContainers = null; + List potentialNodes = scheduler.getNodeTracker() + .getNodesByResourceName(rr.getResourceName()); + for (FSSchedulerNode node : potentialNodes) { + // TODO (YARN-5829): Attempt to reserve the node for starved app. + if (isNodeAlreadyReserved(node, starvedApp)) { + continue; + } - // From the potential nodes, pick a node that has enough containers - // from apps over their fairshare - for (FSSchedulerNode node : potentialNodes) { - // TODO (YARN-5829): Attempt to reserve the node for starved app. The - // subsequent if-check needs to be reworked accordingly. - FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); - if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { - // This node is already reserved by another app. Let us not consider - // this for preemption. - continue; - } + int maxAMContainers = bestContainers == null ? + Integer.MAX_VALUE : bestContainers.numAMContainers; + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode( + rr.getCapability(), node, maxAMContainers); + if (preemptableContainers != null) { + // This set is better than any previously identified set. + bestContainers = preemptableContainers; + if (preemptableContainers.numAMContainers == 0) { + break; + } + } + } // End of iteration through nodes for one RR - int maxAMContainers = bestContainers == null ? - Integer.MAX_VALUE : bestContainers.numAMContainers; - PreemptableContainers preemptableContainers = - identifyContainersToPreemptOnNode(requestCapability, node, - maxAMContainers); - if (preemptableContainers != null) { - if (preemptableContainers.numAMContainers == 0) { - return preemptableContainers; - } else { - bestContainers = preemptableContainers; + if (bestContainers != null && bestContainers.containers.size() > 0) { + containersToPreempt.addAll(bestContainers.containers); + trackPreemptionsAgainstNode(bestContainers.containers); } } - } - - return bestContainers; + } // End of iteration over RRs + return containersToPreempt; } /** @@ -181,23 +179,25 @@ class FSPreemptionThread extends Thread { return null; } - private void preemptContainers(List containers) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - if (containers.size() > 0) { - FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() - .getNode(containers.get(0).getNodeId()); - node.addContainersForPreemption(containers); - } + private boolean isNodeAlreadyReserved( + FSSchedulerNode node, FSAppAttempt app) { + FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); + return nodeReservedApp != null && !nodeReservedApp.equals(app); + } + private void trackPreemptionsAgainstNode(List containers) { + FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() + .getNode(containers.get(0).getNodeId()); + node.addContainersForPreemption(containers); + } + + private void preemptContainers(List containers) { // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container " + container + - " from queue " + queue.getName()); + " from queue " + app.getQueueName()); app.trackContainerForPreemption(container); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 134efff..18806bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1774,4 +1774,8 @@ public class FairScheduler extends public float getReservableNodesRatio() { return reservableNodesRatio; } + + long getNMHeartbeatInterval() { + return nmHeartbeatInterval; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index b18dd7d..8e8e37b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration { protected static final String PREEMPTION_THRESHOLD = CONF_PREFIX + "preemption.cluster-utilization-threshold"; protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; - - protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; - protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; + /** + * Configurable delay (ms) before an app's starvation is considered after + * it is identified. This is to give the scheduler enough time to + * allocate containers post preemption. This delay is added to the + * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats. + * + * This is intended to be a backdoor on production clusters, and hence + * intentionally not documented. + */ + protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = + CONF_PREFIX + "waitTimeBeforeNextStarvationCheck"; + protected static final long + DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000; + /** Whether to assign multiple containers in one check-in. */ public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false; @@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration { "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } - public int getPreemptionInterval() { - return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL); + public long getWaitTimeBeforeNextStarvationCheck() { + return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, + DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS); } public int getWaitTimeBeforeKill() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java new file mode 100644 index 0000000..f157263 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Applications place {@link ResourceRequest}s at multiple levels. This is a + * helper class that allows tracking if a {@link ResourceRequest} has been + * visited at a different locality level. + * + * This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}. + * The implementation is not thread-safe. + */ +class VisitedResourceRequestTracker { + private static final Log LOG = + LogFactory.getLog(VisitedResourceRequestTracker.class); + private final Map> map = + new HashMap<>(); + private final ClusterNodeTracker nodeTracker; + + VisitedResourceRequestTracker( + ClusterNodeTracker nodeTracker) { + this.nodeTracker = nodeTracker; + } + + /** + * Check if the {@link ResourceRequest} is visited before, and track it. + * @param rr {@link ResourceRequest} to visit + * @return true if rr is the first visit across all + * locality levels, false otherwise + */ + boolean visit(ResourceRequest rr) { + Priority priority = rr.getPriority(); + Resource capability = rr.getCapability(); + + Map subMap = map.get(priority); + if (subMap == null) { + subMap = new HashMap<>(); + map.put(priority, subMap); + } + + TrackerPerPriorityResource tracker = subMap.get(capability); + if (tracker == null) { + tracker = new TrackerPerPriorityResource(); + subMap.put(capability, tracker); + } + + return tracker.visit(rr.getResourceName()); + } + + private class TrackerPerPriorityResource { + private Set racksWithNodesVisited = new HashSet<>(); + private Set racksVisted = new HashSet<>(); + private boolean anyVisited; + + private boolean visitAny() { + if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) { + anyVisited = true; + } + return anyVisited; + } + + private boolean visitRack(String rackName) { + if (anyVisited || racksWithNodesVisited.contains(rackName)) { + return false; + } else { + racksVisted.add(rackName); + return true; + } + } + + private boolean visitNode(String rackName) { + if (anyVisited || racksVisted.contains(rackName)) { + return false; + } else { + racksWithNodesVisited.add(rackName); + return true; + } + } + + /** + * Based on whether resourceName is a node, rack or ANY, + * check if this has been visited earlier. + * + * A node is considered visited if its rack or ANY have been visited. + * A rack is considered visited if any nodes or ANY have been visited. + * Any is considered visited if any of the nodes/racks have been visited. + * + * @param resourceName nodename or rackname or ANY + * @return true if this is the first visit, false otherwise + */ + private boolean visit(String resourceName) { + if (resourceName.equals(ResourceRequest.ANY)) { + return visitAny(); + } + + List nodes = + nodeTracker.getNodesByResourceName(resourceName); + int numNodes = nodes.size(); + if (numNodes == 0) { + LOG.error("Found ResourceRequest for a non-existent node/rack named " + + resourceName); + return false; + } + + if (numNodes == 1) { + // Found a single node. To be safe, let us verify it is a node and + // not a rack with a single node. + FSSchedulerNode node = nodes.get(0); + if (node.getNodeName().equals(resourceName)) { + return visitNode(node.getRackName()); + } + } + + // At this point, it is not ANY or a node. Must be a rack + return visitRack(resourceName); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java index 25780cd..706cdc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -21,6 +21,8 @@ import java.util.HashSet; import java.util.Set; public class FairSchedulerWithMockPreemption extends FairScheduler { + static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000; + @Override protected void createPreemptionThread() { preemptionThread = new MockPreemptionThread(this); @@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler { private Set appsAdded = new HashSet<>(); private int totalAppsAdded = 0; - MockPreemptionThread(FairScheduler scheduler) { + private MockPreemptionThread(FairScheduler scheduler) { super(scheduler); } @@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler { FSAppAttempt app = context.getStarvedApps().take(); appsAdded.add(app); totalAppsAdded++; + app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); } catch (InterruptedException e) { return; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index a5b2d86..3a79ac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; @@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES"); + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; private static final String[] QUEUES = @@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { + "minshare and fairshare queues", 3, preemptionThread.uniqueAppsAdded()); - // Verify the apps get added again on a subsequent update + // Verify apps are added again only after the set delay for starvation has + // passed. + clock.tickSec(1); scheduler.update(); - Thread.yield(); - + assertEquals("Apps re-added even before starvation delay passed", + preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded()); verifyLeafQueueStarvation(); + + clock.tickMsec( + FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS); + scheduler.update(); assertTrue("Each app is marked as starved exactly once", preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); } @@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { sendEnoughNodeUpdatesToAssignFully(); // Sleep to hit the preemption timeouts - Thread.sleep(10); + clock.tickMsec(10); // Scheduler update to populate starved apps scheduler.update(); @@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { ALLOC_FILE.exists()); resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread) scheduler.preemptionThread; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 16df1ed..a4d69bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); private static final int GB = 1024; + // Scheduler clock + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; - @Parameterized.Parameters - public static Collection getParameters() { - return Arrays.asList(new Boolean[][] { - {true}, {false}}); + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][] { + {"FairSharePreemption", true}, + {"MinSharePreemption", false}}); } - public TestFairSchedulerPreemption(Boolean fairshare) throws IOException { + public TestFairSchedulerPreemption(String name, boolean fairshare) + throws IOException { fairsharePreemption = fairshare; writeAllocFile(); } @Before - public void setup() { + public void setup() throws IOException { createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE.getAbsolutePath()); conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + setupCluster(); } @After @@ -166,8 +173,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private void setupCluster() throws IOException { resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); // Create and add two nodes to the cluster addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); @@ -197,7 +205,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * * @param queueName queue name */ - private void takeAllResource(String queueName) { + private void takeAllResources(String queueName) { // Create an app that takes up all the resources on the cluster ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, 1, queueName, "default", @@ -227,8 +235,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); starvingApp = scheduler.getSchedulerApp(appAttemptId); - // Sleep long enough to pass - Thread.sleep(10); + // Move clock enough to identify starvation + clock.tickSec(1); scheduler.update(); } @@ -243,14 +251,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { */ private void submitApps(String queue1, String queue2) throws InterruptedException { - takeAllResource(queue1); + takeAllResources(queue1); preemptHalfResources(queue2); } private void verifyPreemption() throws InterruptedException { - // Sleep long enough for four containers to be preempted. Note that the - // starved app must be queued four times for containers to be preempted. - for (int i = 0; i < 10000; i++) { + // Sleep long enough for four containers to be preempted. + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() == 4) { break; } @@ -268,7 +275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private void verifyNoPreemption() throws InterruptedException { // Sleep long enough to ensure not even one container is preempted. - for (int i = 0; i < 600; i++) { + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() != 8) { break; } @@ -279,7 +286,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionWithinSameLeafQueue() throws Exception { - setupCluster(); String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { @@ -291,21 +297,18 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.preemptable.child-2"); verifyPreemption(); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); verifyPreemption(); } @Test public void testNoPreemptionFromDisallowedQueue() throws Exception { - setupCluster(); submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } @@ -331,9 +334,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionSelectNonAMContainer() throws Exception { - setupCluster(); - - takeAllResource("root.preemptable.child-1"); + takeAllResources("root.preemptable.child-1"); setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java new file mode 100644 index 0000000..07b8498 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at* + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +import java.util.List; + +public class TestVisitedResourceRequestTracker { + private final ClusterNodeTracker + nodeTracker = new ClusterNodeTracker<>(); + private final ResourceRequest + anyRequest, rackRequest, node1Request, node2Request; + + private final String NODE_VISITED = "The node is already visited. "; + private final String RACK_VISITED = "The rack is already visited. "; + private final String ANY_VISITED = "ANY is already visited. "; + private final String NODE_FAILURE = "The node is visited again."; + private final String RACK_FAILURE = "The rack is visited again."; + private final String ANY_FAILURE = "ANY is visited again."; + private final String FIRST_CALL_FAILURE = "First call to visit failed."; + + public TestVisitedResourceRequestTracker() { + List rmNodes = + MockNodes.newNodes(1, 2, Resources.createResource(8192, 8)); + + FSSchedulerNode node1 = new FSSchedulerNode(rmNodes.get(0), false); + nodeTracker.addNode(node1); + node1Request = createRR(node1.getNodeName(), 1); + + FSSchedulerNode node2 = new FSSchedulerNode(rmNodes.get(1), false); + node2Request = createRR(node2.getNodeName(), 1); + nodeTracker.addNode(node2); + + anyRequest = createRR(ResourceRequest.ANY, 2); + rackRequest = createRR(node1.getRackName(), 2); + } + + private ResourceRequest createRR(String resourceName, int count) { + return ResourceRequest.newInstance( + Priority.UNDEFINED, resourceName, Resources.none(), count); + } + + @Test + public void testVisitAnyRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit ANY request first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(anyRequest)); + + // All other requests should return false + assertFalse(ANY_VISITED + RACK_FAILURE, tracker.visit(rackRequest)); + assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node1Request)); + assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node2Request)); + } + + @Test + public void testVisitRackRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit rack request first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(rackRequest)); + + // All other requests should return false + assertFalse(RACK_VISITED + ANY_FAILURE, tracker.visit(anyRequest)); + assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node1Request)); + assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node2Request)); + } + + @Test + public void testVisitNodeRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit node1 first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(node1Request)); + + // Rack and ANY should return false + assertFalse(NODE_VISITED + ANY_FAILURE, tracker.visit(anyRequest)); + assertFalse(NODE_VISITED + RACK_FAILURE, tracker.visit(rackRequest)); + + // The other node should return true + assertTrue(NODE_VISITED + "Different node visit failed", + tracker.visit(node2Request)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org