Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6E54C19626 for ; Mon, 28 Mar 2016 18:13:08 +0000 (UTC) Received: (qmail 92754 invoked by uid 500); 28 Mar 2016 18:13:08 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 92685 invoked by uid 500); 28 Mar 2016 18:13:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 92673 invoked by uid 99); 28 Mar 2016 18:13:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Mar 2016 18:13:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0CB16DFC73; Mon, 28 Mar 2016 18:13:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Message-Id: <00cc172478ee473693b360ed759c435d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du Date: Mon, 28 Mar 2016 18:13:08 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 5a552973f -> c7d843af3 YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7d843af Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7d843af Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7d843af Branch: refs/heads/branch-2 Commit: c7d843af3b1adde602fe0335f52ffa28b0371bf4 Parents: 5a55297 Author: Jian He Authored: Mon Mar 28 11:12:33 2016 -0700 Committer: Jian He Committed: Mon Mar 28 11:13:02 2016 -0700 ---------------------------------------------------------------------- .../server/resourcemanager/AdminService.java | 40 +++++----- .../resourcemanager/ResourceTrackerService.java | 80 ++++++++++++++++---- .../resource/DynamicResourceConfiguration.java | 13 ++-- .../resourcemanager/TestRMAdminService.java | 78 ++++++++++++++++++- 4 files changed, 166 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d843af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index fbc2d6f..fc530e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -639,34 +639,32 @@ public class AdminService extends CompositeService implements try { Configuration conf = getConfig(); Configuration configuration = new Configuration(conf); - DynamicResourceConfiguration newconf; - - InputStream DRInputStream = - this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(configuration, - YarnConfiguration.DR_CONFIGURATION_FILE); - if (DRInputStream != null) { - configuration.addResource(DRInputStream); - newconf = new DynamicResourceConfiguration(configuration, false); + DynamicResourceConfiguration newConf; + + InputStream drInputStream = + this.rmContext.getConfigurationProvider().getConfigurationInputStream( + configuration, YarnConfiguration.DR_CONFIGURATION_FILE); + + if (drInputStream != null) { + newConf = new DynamicResourceConfiguration(configuration, + drInputStream); } else { - newconf = new DynamicResourceConfiguration(configuration, true); + newConf = new DynamicResourceConfiguration(configuration); } - if (newconf.getNodes() == null || newconf.getNodes().length == 0) { - RMAuditLogger.logSuccess(user.getShortUserName(), argName, - "AdminService"); - return response; - } else { + if (newConf.getNodes() != null && newConf.getNodes().length != 0) { Map nodeResourceMap = - newconf.getNodeResourceMap(); - + newConf.getNodeResourceMap(); UpdateNodeResourceRequest updateRequest = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); + UpdateNodeResourceRequest.newInstance(nodeResourceMap); updateNodeResource(updateRequest); - RMAuditLogger.logSuccess(user.getShortUserName(), argName, - "AdminService"); - return response; } + // refresh dynamic resource in ResourceTrackerService + this.rmContext.getResourceTrackerService(). + updateDynamicResourceConfiguration(newConf); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, + "AdminService"); + return response; } catch (IOException ioe) { throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d843af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 902244b..b0bc565 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; + private volatile DynamicResourceConfiguration drConf; public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, @@ -139,11 +142,11 @@ public class ResourceTrackerService extends AbstractService implements } minAllocMb = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); minAllocVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, @@ -156,9 +159,42 @@ public class ResourceTrackerService extends AbstractService implements YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); } + loadDynamicResourceConfiguration(conf); + super.serviceInit(conf); } + /** + * Load DynamicResourceConfiguration from dynamic-resources.xml. + * @param conf + * @throws IOException + */ + public void loadDynamicResourceConfiguration(Configuration conf) + throws IOException { + try { + // load dynamic-resources.xml + InputStream drInputStream = this.rmContext.getConfigurationProvider() + .getConfigurationInputStream(conf, + YarnConfiguration.DR_CONFIGURATION_FILE); + if (drInputStream != null) { + this.drConf = new DynamicResourceConfiguration(conf, drInputStream); + } else { + this.drConf = new DynamicResourceConfiguration(conf); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Update DynamicResourceConfiguration with new configuration. + * @param conf + */ + public void updateDynamicResourceConfiguration( + DynamicResourceConfiguration conf) { + this.drConf = conf; + } + @Override protected void serviceStart() throws Exception { super.serviceStart(); @@ -166,15 +202,14 @@ public class ResourceTrackerService extends AbstractService implements // security is enabled, so no secretManager. Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); - this.server = - rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, - conf, null, - conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); - + this.server = rpc.getServer( + ResourceTracker.class, this, resourceTrackerAddress, conf, null, + conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); + // Enable service authorization? if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { InputStream inputStream = this.rmContext.getConfigurationProvider() @@ -185,12 +220,12 @@ public class ResourceTrackerService extends AbstractService implements } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } - + this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - server.getListenerAddress()); + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + server.getListenerAddress()); } @Override @@ -295,6 +330,19 @@ public class ResourceTrackerService extends AbstractService implements return response; } + // check if node's capacity is load from dynamic-resources.xml + String[] nodes = this.drConf.getNodes(); + String nid = nodeId.toString(); + + if (nodes != null && Arrays.asList(nodes).contains(nid)) { + capability.setMemory(this.drConf.getMemoryPerNode(nid)); + capability.setVirtualCores(this.drConf.getVcoresPerNode(nid)); + if (LOG.isDebugEnabled()) { + LOG.debug("Resource for node: " + nid + " is adjusted to " + + capability + " due to settings in dynamic-resources.xml."); + } + } + // Check if this node has minimum allocations if (capability.getMemory() < minAllocMb || capability.getVirtualCores() < minAllocVcores) { @@ -311,7 +359,7 @@ public class ResourceTrackerService extends AbstractService implements response.setContainerTokenMasterKey(containerTokenSecretManager .getCurrentKey()); response.setNMTokenMasterKey(nmTokenSecretManager - .getCurrentKey()); + .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, resolve(host), capability, nodeManagerVersion); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d843af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java index dd37801..045c7bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.resource; +import java.io.InputStream; + import java.util.HashMap; import java.util.Map; @@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration { private static final Log LOG = LogFactory.getLog(DynamicResourceConfiguration.class); - private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml"; - @Private public static final String PREFIX = "yarn.resource.dynamic."; @@ -63,15 +63,14 @@ public class DynamicResourceConfiguration extends Configuration { } public DynamicResourceConfiguration(Configuration configuration) { - this(configuration, true); + super(configuration); + addResource(YarnConfiguration.DR_CONFIGURATION_FILE); } public DynamicResourceConfiguration(Configuration configuration, - boolean useLocalConfigurationProvider) { + InputStream drInputStream) { super(configuration); - if (useLocalConfigurationProvider) { - addResource(DR_CONFIGURATION_FILE); - } + addResource(drInputStream); } private String getNodePrefix(String node) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d843af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 639b955..4513cbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -27,7 +27,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -202,7 +204,7 @@ public class TestRMAdminService { } @Test - public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider() + public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider() throws IOException, YarnException { configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); @@ -240,6 +242,75 @@ public class TestRMAdminService { } @Test + public void testResourcePersistentForNMRegistrationWithNewResource() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + rm.registerNode("h1:1234", 5120); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // register the same node again with a different resource. + // validate this won't work as resource cached in RM side. + rm.registerNode("h1:1234", 8192, 8); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("", resourceAfter.toString()); + + // Replace original dr file with an empty dr file, and validate node + // registration with new resources will take effective now. + deleteOnRemoteFileSystem("dynamic-resources.xml"); + DynamicResourceConfiguration emptyDRConf = + new DynamicResourceConfiguration(); + + uploadConfiguration(emptyDRConf, "dynamic-resources.xml"); + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + try { + // register the same node third time, this time the register resource + // should work. + rm.registerNode("h1:1234", 8192, 8); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + niAfter = rm.getRMContext().getRMNodes().get(nid); + resourceAfter = niAfter.getTotalCapability(); + // new resource in registration should take effective as we empty + // dynamic resource file already. + Assert.assertEquals("", resourceAfter.toString()); + } + + @Test public void testAdminAclsWithLocalConfigurationProvider() { rm = new MockRM(configuration); rm.init(configuration); @@ -1006,6 +1077,11 @@ public class TestRMAdminService { uploadToRemoteFileSystem(new Path(csConfFile)); } + private void deleteOnRemoteFileSystem(String fileName) + throws IOException { + fs.delete(new Path(workingPath, fileName)); + } + private void uploadDefaultConfiguration() throws IOException { Configuration conf = new Configuration(); uploadConfiguration(conf, "core-site.xml");