Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BFE3D7EB9 for ; Wed, 14 Sep 2011 22:47:23 +0000 (UTC) Received: (qmail 42380 invoked by uid 500); 14 Sep 2011 22:47:23 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 42351 invoked by uid 500); 14 Sep 2011 22:47:23 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 42343 invoked by uid 99); 14 Sep 2011 22:47:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Sep 2011 22:47:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_OPPORTUN1 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Sep 2011 22:47:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 24284238890A; Wed, 14 Sep 2011 22:46:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1170879 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-yarn-server... Date: Wed, 14 Sep 2011 22:46:57 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110914224658.24284238890A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Wed Sep 14 22:46:57 2011 New Revision: 1170879 URL: http://svn.apache.org/viewvc?rev=1170879&view=rev Log: MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly enforce locality constraints. Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1170879&r1=1170878&r2=1170879&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Sep 14 22:46:57 2011 @@ -1324,6 +1324,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service startup fails. (Ravi Teja via vinodkv) + MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly + enforce locality constraints. (acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1170879&r1=1170878&r2=1170879&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Sep 14 22:46:57 2011 @@ -1023,21 +1023,17 @@ public class LeafQueue implements CSQueu // Check if we need containers on this rack ResourceRequest rackLocalRequest = application.getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + return false; + } + + // If we are here, we do need containers on this rack for RACK_LOCAL req if (type == NodeType.RACK_LOCAL) { - if (rackLocalRequest == null) { - return false; - } else { - return rackLocalRequest.getNumContainers() > 0; - } + return true; } // Check if we need containers on this host if (type == NodeType.NODE_LOCAL) { - // First: Do we need containers on this rack? - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) { - return false; - } - // Now check if we need containers on this host... ResourceRequest nodeLocalRequest = application.getResourceRequest(priority, node.getHostName()); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1170879&r1=1170878&r2=1170879&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Sep 14 22:46:57 2011 @@ -289,6 +289,7 @@ public class FifoScheduler implements Re return nodes.get(nodeId); } + @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, String queueName, String user) { // TODO: Fix store @@ -440,6 +441,14 @@ public class FifoScheduler implements Re ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); if (request != null) { + // Don't allocate on this node if we don't need containers on this rack + ResourceRequest rackRequest = + application.getResourceRequest(priority, + node.getRMNode().getRackName()); + if (rackRequest == null || rackRequest.getNumContainers() <= 0) { + return 0; + } + int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, @@ -458,6 +467,13 @@ public class FifoScheduler implements Re ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { + // Don't allocate on this rack if the application doens't need containers + ResourceRequest offSwitchRequest = + application.getResourceRequest(priority, SchedulerNode.ANY); + if (offSwitchRequest.getNumContainers() <= 0) { + return 0; + } + int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1170879&r1=1170878&r2=1170879&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Sep 14 22:46:57 2011 @@ -625,7 +625,6 @@ public class TestLeafQueue { } - @Test public void testLocalityScheduling() throws Exception { @@ -876,6 +875,107 @@ public class TestLeafQueue { } + @Test + public void testSchedulingConstraints() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); + a.submitApplication(app_0, user_0, A); + + // Setup some nodes and racks + String host_0_0 = "host_0_0"; + String rack_0 = "rack_0"; + SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + String host_0_1 = "host_0_1"; + SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + + + String host_1_0 = "host_1_0"; + String rack_1 = "rack_1"; + SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests and submit + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0_1, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_1_0, 1*GB, 1, + priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // Start testing... + + // Add one request + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // NODE_LOCAL - node_0_1 + a.assignContainers(clusterResource, node_0_0); + verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + // No allocation on node_1_0 even though it's node/rack local since + // required(ANY) == 0 + a.assignContainers(clusterResource, node_1_0); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero + // since #req=0 + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + // Add one request + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one + priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // No allocation on node_0_1 even though it's node/rack local since + // required(rack_1) == 0 + a.assignContainers(clusterResource, node_0_1); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(1, app_0.getTotalRequiredResources(priority)); + + // NODE_LOCAL - node_1 + a.assignContainers(clusterResource, node_1_0); + verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + } + @After public void tearDown() throws Exception { }