Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AE83018848 for ; Fri, 5 Jun 2015 16:45:22 +0000 (UTC) Received: (qmail 27545 invoked by uid 500); 5 Jun 2015 16:45:22 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 27478 invoked by uid 500); 5 Jun 2015 16:45:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 27465 invoked by uid 99); 5 Jun 2015 16:45:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jun 2015 16:45:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B5E3E0E57; Fri, 5 Jun 2015 16:45:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Message-Id: <11c599574b644523a1ee8964db15dfd0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3259. FairScheduler: Trigger fairShare updates on node events. (Anubhav Dhoot via kasha) Date: Fri, 5 Jun 2015 16:45:22 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 1a2e6e809 -> 429754de0 YARN-3259. FairScheduler: Trigger fairShare updates on node events. (Anubhav Dhoot via kasha) (cherry picked from commit 75885852cc19dd6de12e62498b112d5d70ce87f4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/429754de Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/429754de Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/429754de Branch: refs/heads/branch-2 Commit: 429754de09cefc7dd94e7127c9ea1d314b6bd871 Parents: 1a2e6e8 Author: Karthik Kambatla Authored: Fri Jun 5 09:39:41 2015 -0700 Committer: Karthik Kambatla Committed: Fri Jun 5 09:42:12 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSOpDurations.java | 6 + .../scheduler/fair/FairScheduler.java | 23 +++- .../scheduler/fair/TestSchedulingUpdate.java | 135 +++++++++++++++++++ 4 files changed, 163 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/429754de/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 67ce437..02872a5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -258,6 +258,9 @@ Release 2.8.0 - UNRELEASED YARN-3547. FairScheduler: Apps that have no resource demand should not participate scheduling. (Xianyin Xin via kasha) + YARN-3259. FairScheduler: Trigger fairShare updates on node events. + (Anubhav Dhoot via kasha) + BUG FIXES YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena http://git-wip-us.apache.org/repos/asf/hadoop/blob/429754de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java index c2282fd..20d2af9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.MetricsCollector; @@ -116,4 +117,9 @@ public class FSOpDurations implements MetricsSource { public void addPreemptCallDuration(long value) { preemptCall.add(value); } + + @VisibleForTesting + public boolean hasUpdateThreadRunChanged() { + return updateThreadRun.changed(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/429754de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 07b3271..64b3f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -103,9 +103,9 @@ import com.google.common.base.Preconditions; * of the root queue in the typical fair scheduling fashion. Then, the children * distribute the resources assigned to them to their children in the same * fashion. Applications may only be scheduled on leaf queues. Queues can be - * specified as children of other queues by placing them as sub-elements of their - * parents in the fair scheduler configuration file. - * + * specified as children of other queues by placing them as sub-elements of + * their parents in the fair scheduler configuration file. + * * A queue's name starts with the names of its parents, with periods as * separators. So a queue named "queue1" under the root named, would be * referred to as "root.queue1", and a queue named "queue2" under a queue @@ -142,6 +142,8 @@ public class FairScheduler extends @VisibleForTesting Thread updateThread; + private final Object updateThreadMonitor = new Object(); + @VisibleForTesting Thread schedulingThread; // timeout to join when we stop this service @@ -246,6 +248,13 @@ public class FairScheduler extends return queueMgr; } + // Allows UpdateThread to start processing without waiting till updateInterval + void triggerUpdate() { + synchronized (updateThreadMonitor) { + updateThreadMonitor.notify(); + } + } + /** * Thread which calls {@link FairScheduler#update()} every * updateInterval milliseconds. @@ -256,7 +265,9 @@ public class FairScheduler extends public void run() { while (!Thread.currentThread().isInterrupted()) { try { - Thread.sleep(updateInterval); + synchronized (updateThreadMonitor) { + updateThreadMonitor.wait(updateInterval); + } long start = getClock().getTime(); update(); preemptTasksIfNecessary(); @@ -838,6 +849,8 @@ public class FairScheduler extends updateRootQueueMetrics(); updateMaximumAllocation(schedulerNode, true); + triggerUpdate(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + @@ -853,6 +866,8 @@ public class FairScheduler extends Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); + triggerUpdate(); + // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/429754de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java new file mode 100644 index 0000000..94298f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestSchedulingUpdate extends FairSchedulerTestBase { + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + + // Make the update loop to never finish to ensure zero update calls + conf.setInt( + FairSchedulerConfiguration.UPDATE_INTERVAL_MS, + Integer.MAX_VALUE); + return conf; + } + + @Before + public void setup() { + conf = createConfiguration(); + resourceManager = new MockRM(conf); + resourceManager.start(); + + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + @Test (timeout = 3000) + public void testSchedulingUpdateOnNodeJoinLeave() throws InterruptedException { + + verifyNoCalls(); + + // Add one node + String host = "127.0.0.1"; + final int memory = 4096; + final int cores = 4; + RMNode node1 = MockNodes.newNodeInfo( + 1, Resources.createResource(memory, cores), 1, host); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + long expectedCalls = 1; + verifyExpectedCalls(expectedCalls, memory, cores); + + // Remove the node + NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + expectedCalls = 2; + verifyExpectedCalls(expectedCalls, 0, 0); + } + + private void verifyExpectedCalls(long expectedCalls, int memory, int vcores) + throws InterruptedException { + boolean verified = false; + int count = 0; + while (count < 100) { + if (scheduler.fsOpDurations.hasUpdateThreadRunChanged()) { + break; + } + count++; + Thread.sleep(10); + } + assertTrue("Update Thread has not run based on its metrics", + scheduler.fsOpDurations.hasUpdateThreadRunChanged()); + assertEquals("Root queue metrics memory does not have expected value", + memory, scheduler.getRootQueueMetrics().getAvailableMB()); + assertEquals("Root queue metrics cpu does not have expected value", + vcores, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + scheduler.fsOpDurations.getMetrics(collector, true); + MetricsRecord record = collector.getRecords().get(0); + for (AbstractMetric abstractMetric : record.metrics()) { + if (abstractMetric.name().contains("UpdateThreadRunNumOps")) { + assertEquals("Update Thread did not run expected number of times " + + "based on metric record count", + expectedCalls, + abstractMetric.value()); + verified = true; + } + } + assertTrue("Did not find metric for UpdateThreadRunNumOps", verified); + } + + private void verifyNoCalls() { + assertFalse("Update thread should not have executed", + scheduler.fsOpDurations.hasUpdateThreadRunChanged()); + assertEquals("Scheduler queue memory should not have been updated", + 0, scheduler.getRootQueueMetrics().getAvailableMB()); + assertEquals("Scheduler queue cpu should not have been updated", + 0,scheduler.getRootQueueMetrics().getAvailableVirtualCores()); + } +}