Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAD6A17790 for ; Mon, 30 Mar 2015 19:05:30 +0000 (UTC) Received: (qmail 81995 invoked by uid 500); 30 Mar 2015 19:05:24 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 81726 invoked by uid 500); 30 Mar 2015 19:05:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 81585 invoked by uid 99); 30 Mar 2015 19:05:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2015 19:05:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 203EADFAFF; Mon, 30 Mar 2015 19:05:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Mon, 30 Mar 2015 19:05:25 -0000 Message-Id: <2551d7d067294f9a96448cc19c25c757@git.apache.org> In-Reply-To: <7b922fb012024fd6b66855a87ca4ee2e@git.apache.org> References: <7b922fb012024fd6b66855a87ca4ee2e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda) YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a945d24 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a945d24 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a945d24 Branch: refs/heads/trunk Commit: 2a945d24f7de1a7ae6e7bd6636188ce3b55c7f52 Parents: b804571 Author: Wangda Tan Authored: Mon Mar 30 12:04:51 2015 -0700 Committer: Wangda Tan Committed: Mon Mar 30 12:05:21 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 12 + .../src/main/proto/yarn_protos.proto | 4 + .../yarn/client/TestResourceTrackerOnHA.java | 2 +- .../protocolrecords/NodeHeartbeatRequest.java | 8 +- .../protocolrecords/NodeHeartbeatResponse.java | 3 + .../RegisterNodeManagerRequest.java | 12 + .../RegisterNodeManagerResponse.java | 3 + .../impl/pb/NodeHeartbeatRequestPBImpl.java | 37 ++ .../impl/pb/NodeHeartbeatResponsePBImpl.java | 13 + .../pb/RegisterNodeManagerRequestPBImpl.java | 48 ++- .../pb/RegisterNodeManagerResponsePBImpl.java | 13 + .../yarn_server_common_service_protos.proto | 4 + .../hadoop/yarn/TestYarnServerApiClasses.java | 94 ++++ .../yarn/server/nodemanager/NodeManager.java | 34 +- .../nodemanager/NodeStatusUpdaterImpl.java | 114 ++++- .../nodelabels/NodeLabelsProvider.java | 43 ++ .../nodemanager/TestNodeStatusUpdater.java | 2 +- .../TestNodeStatusUpdaterForLabels.java | 281 ++++++++++++ .../resourcemanager/ResourceTrackerService.java | 80 +++- .../TestResourceTrackerService.java | 430 ++++++++++++++++++- 21 files changed, 1199 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b38c9ac..f72d06d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -83,6 +83,9 @@ Release 2.8.0 - UNRELEASED YARN-3288. Document and fix indentation in the DockerContainerExecutor code + YARN-2495. Allow admin specify labels from each NM (Distributed + configuration for node label). (Naganarasimha G R via wangda) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a527af4..13e9a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1719,6 +1719,18 @@ public class YarnConfiguration extends Configuration { public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX + "enabled"; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; + + public static final String NODELABEL_CONFIGURATION_TYPE = + NODE_LABELS_PREFIX + "configuration-type"; + + public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE = + "centralized"; + + public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = + "distributed"; + + public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = + CENTALIZED_NODELABEL_CONFIGURATION_TYPE; public YarnConfiguration() { super(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 194be82..b396f4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -239,6 +239,10 @@ message NodeIdToLabelsProto { repeated string nodeLabels = 2; } +message StringArrayProto { + repeated string elements = 1; +} + message LabelsToNodeIdsProto { optional string nodeLabels = 1; repeated NodeIdProto nodeId = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..8167a58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -70,7 +70,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{ NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..b80d9ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,7 @@ public abstract class NodeHeartbeatRequest { public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +36,7 @@ public abstract class NodeHeartbeatRequest { .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } @@ -45,4 +48,7 @@ public abstract class NodeHeartbeatRequest { public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 9fb44ca..1498a0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse { void setSystemCredentialsForApps( Map systemCredentials); + + boolean getAreNodeLabelsAcceptedByRM(); + void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 366c32c..bf09b33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -31,6 +32,14 @@ public abstract class RegisterNodeManagerRequest { int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications, Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +48,7 @@ public abstract class RegisterNodeManagerRequest { request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +57,8 @@ public abstract class RegisterNodeManagerRequest { public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index b20803f..c8678f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse { void setRMVersion(String version); String getRMVersion(); + + boolean getAreNodeLabelsAcceptedByRM(); + void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..16d47f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto; +import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; @@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +86,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null) { + builder.clearNodeLabels(); + builder.setNodeLabels(StringArrayProto.newBuilder() + .addAllElements(this.labels).build()); + } } private void mergeLocalToProto() { @@ -178,4 +189,30 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return this.labels; + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + builder.clearNodeLabels(); + this.labels = nodeLabels; + } + + private void initNodeLabels() { + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeLabels()) { + labels = null; + return; + } + StringArrayProto nodeLabels = p.getNodeLabels(); + labels = new HashSet(nodeLabels.getElementsList()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 630a5bf..e27d8ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -483,5 +483,18 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + + @Override + public boolean getAreNodeLabelsAcceptedByRM() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getAreNodeLabelsAcceptedByRM(); + } + + @Override + public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) { + maybeInitBuilder(); + this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..1d2bb82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,32 +20,27 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - - public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); @@ -56,7 +51,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; - + private Set labels = null; + public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); } @@ -86,7 +82,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null) { + builder.clearNodeLabels(); + builder.setNodeLabels(StringArrayProto.newBuilder() + .addAllElements(this.labels).build()); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +292,32 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return this.labels; + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + builder.clearNodeLabels(); + this.labels = nodeLabels; + } + + private void initNodeLabels() { + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeLabels()) { + labels=null; + return; + } + StringArrayProto nodeLabels = p.getNodeLabels(); + labels = new HashSet(nodeLabels.getElementsList()); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index ac329ed..391d00d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -216,4 +216,17 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase ()); + copy = new NodeHeartbeatRequestPBImpl( + original.getProto()); + Assert.assertNotNull(copy.getNodeLabels()); + Assert.assertEquals(0, copy.getNodeLabels().size()); + } + + /** + * Test NodeHeartbeatRequestPBImpl. + */ + @Test + public void testNodeHeartbeatRequestPBImplWithNullLabels() { + NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl(); + NodeHeartbeatRequestPBImpl copy = + new NodeHeartbeatRequestPBImpl(original.getProto()); + Assert.assertNull(copy.getNodeLabels()); } /** @@ -119,6 +154,16 @@ public class TestYarnServerApiClasses { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); + } + + @Test + public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() { + NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); + original.setAreNodeLabelsAcceptedByRM(true); + NodeHeartbeatResponsePBImpl copy = + new NodeHeartbeatResponsePBImpl(original.getProto()); + assertTrue(copy.getAreNodeLabelsAcceptedByRM()); } /** @@ -208,6 +253,55 @@ public class TestYarnServerApiClasses { } + @Test + public void testRegisterNodeManagerRequestWithNullLabels() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with no values + Assert.assertNull(request1.getNodeLabels()); + } + + @Test + public void testRegisterNodeManagerRequestWithValidLabels() { + HashSet nodeLabels = getValidNodeLabels(); + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, nodeLabels); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest copy = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with valid values + Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels())); + + // check for empty labels + request.setNodeLabels(new HashSet ()); + copy = new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + Assert.assertNotNull(copy.getNodeLabels()); + Assert.assertEquals(0, copy.getNodeLabels().size()); + } + + private HashSet getValidNodeLabels() { + HashSet nodeLabels = new HashSet(); + nodeLabels.add("java"); + nodeLabels.add("windows"); + nodeLabels.add("gpu"); + nodeLabels.add("x86"); + return nodeLabels; + } + private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { ContainerStatus status = recordFactory http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a4be120..f95a7ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -80,6 +81,7 @@ public class NodeManager extends CompositeService protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsProvider nodeLabelsProvider; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -98,7 +100,22 @@ public class NodeManager extends CompositeService protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + metrics, nodeLabelsProvider); + } + + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProvider nodeLabelsProvider) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, nodeLabelsProvider); + } + + @VisibleForTesting + protected NodeLabelsProvider createNodeLabelsProvider( + Configuration conf) throws IOException { + // TODO as part of YARN-2729 + // Need to get the implementation of provider service and return + return null; } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -245,9 +262,18 @@ public class NodeManager extends CompositeService this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); - - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + + nodeLabelsProvider = createNodeLabelsProvider(conf); + + if (null == nodeLabelsProvider) { + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + } else { + addService(nodeLabelsProvider); + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsProvider); + } NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 6ddd7e4..2549e0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -120,15 +123,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; Set pendingContainersToRemove = new HashSet(); + private final NodeLabelsProvider nodeLabelsProvider; + private final boolean hasNodeLabelsProvider; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, null); + } + + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProvider nodeLabelsProvider) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsProvider = nodeLabelsProvider; + this.hasNodeLabelsProvider = (nodeLabelsProvider != null); this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; - this.recentlyStoppedContainers = - new LinkedHashMap(); + this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = new HashMap(); } @@ -253,22 +266,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); + Set nodeLabels = null; + if (hasNodeLabelsProvider) { + nodeLabels = nodeLabelsProvider.getNodeLabels(); + nodeLabels = + (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : nodeLabels; + } RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + nodeManagerVersionId, containerReports, getRunningApplications(), + nodeLabels); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); - // if the Resourcemanager instructs NM to shutdown. + // if the Resource Manager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { String message = "Message from ResourceManager: " + regNMResponse.getDiagnosticsMessage(); throw new YarnRuntimeException( - "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, " + "Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, " + message); } @@ -306,8 +327,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.context.getNMTokenSecretManager().setMasterKey(masterKey); } - LOG.info("Registered with ResourceManager as " + this.nodeId - + " with total resource of " + this.totalResource); + StringBuilder successfullRegistrationMsg = new StringBuilder(); + successfullRegistrationMsg.append("Registered with ResourceManager as ") + .append(this.nodeId).append(" with total resource of ") + .append(this.totalResource); + + if (regNMResponse.getAreNodeLabelsAcceptedByRM()) { + successfullRegistrationMsg + .append(" and with following Node label(s) : {") + .append(StringUtils.join(",", nodeLabels)).append("}"); + } else if (hasNodeLabelsProvider) { + //case where provider is set but RM did not accept the Node Labels + LOG.error(regNMResponse.getDiagnosticsMessage()); + } + + LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); ((ContainerManagerImpl) this.context.getContainerManager()) .setBlockNewContainerRequests(false); @@ -580,19 +614,41 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override @SuppressWarnings("unchecked") public void run() { - int lastHeartBeatID = 0; + int lastHeartbeatID = 0; + Set lastUpdatedNodeLabelsToRM = null; + if (hasNodeLabelsProvider) { + lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels(); + lastUpdatedNodeLabelsToRM = + (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : lastUpdatedNodeLabelsToRM; + } while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; - NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); - + Set nodeLabelsForHeartbeat = null; + NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); + + if (hasNodeLabelsProvider) { + nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels(); + //if the provider returns null then consider empty labels are set + nodeLabelsForHeartbeat = + (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : nodeLabelsForHeartbeat; + if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat, + lastUpdatedNodeLabelsToRM)) { + //if nodelabels have not changed then no need to send + nodeLabelsForHeartbeat = null; + } + } + NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartbeat); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -623,6 +679,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements break; } + if (response.getAreNodeLabelsAcceptedByRM()) { + lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat; + LOG.info("Node Labels {" + + StringUtils.join(",", nodeLabelsForHeartbeat) + + "} were Accepted by RM "); + } else if (nodeLabelsForHeartbeat != null) { + // case where NodeLabelsProvider is set and updated labels were + // sent to RM and RM rejected the labels + LOG.error(response.getDiagnosticsMessage()); + } + // Explicitly put this method after checking the resync response. We // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM @@ -631,7 +698,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); - lastHeartBeatID = response.getResponseId(); + lastHeartbeatID = response.getResponseId(); List containersToCleanup = response .getContainersToCleanup(); if (!containersToCleanup.isEmpty()) { @@ -680,6 +747,23 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + /** + * Caller should take care of sending non null nodelabels for both + * arguments + * + * @param nodeLabelsNew + * @param nodeLabelsOld + * @return if the New node labels are diff from the older one. + */ + private boolean areNodeLabelsUpdated(Set nodeLabelsNew, + Set nodeLabelsOld) { + if (nodeLabelsNew.size() != nodeLabelsOld.size() + || !nodeLabelsOld.containsAll(nodeLabelsNew)) { + return true; + } + return false; + } + private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java new file mode 100644 index 0000000..4b34d76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.Set; + +import org.apache.hadoop.service.AbstractService; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public abstract class NodeLabelsProvider extends AbstractService { + + public NodeLabelsProvider(String name) { + super(name); + } + + /** + * Provides the labels. LabelProvider is expected to give same Labels + * continuously until there is a change in labels. + * If null is returned then Empty label set is assumed by the caller. + * + * @return Set of node label strings applicable for a node + */ + public abstract Set getNodeLabels(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 71a420e..fc404de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1182,7 +1182,7 @@ public class TestNodeStatusUpdater { } }; verifyNodeStartFailure( - "Recieved SHUTDOWN signal from Resourcemanager ," + "Recieved SHUTDOWN signal from Resourcemanager, " + "Registration of NodeManager failed, " + "Message from ResourceManager: RM Shutting Down Node"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java new file mode 100644 index 0000000..437e4c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private NodeManager nm; + protected DummyNodeLabelsProvider dummyLabelsProviderRef; + + @Before + public void setup() { + dummyLabelsProviderRef = new DummyNodeLabelsProvider(); + } + + @After + public void tearDown() { + if (null != nm) { + ServiceOperations.stop(nm); + } + } + + private class ResourceTrackerForLabels implements ResourceTracker { + int heartbeatID = 0; + Set labels; + + private boolean receivedNMHeartbeat = false; + private boolean receivedNMRegister = false; + + private MasterKey createMasterKey() { + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) + .byteValue() })); + return masterKey; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + labels = request.getNodeLabels(); + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(NodeAction.NORMAL); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + response.setAreNodeLabelsAcceptedByRM(labels != null); + synchronized (ResourceTrackerForLabels.class) { + receivedNMRegister = true; + ResourceTrackerForLabels.class.notifyAll(); + } + return response; + } + + public void waitTillHeartbeat() { + if (receivedNMHeartbeat) { + return; + } + int i = 500; + while (!receivedNMHeartbeat && i > 0) { + synchronized (ResourceTrackerForLabels.class) { + if (!receivedNMHeartbeat) { + try { + System.out + .println("In ResourceTrackerForLabels waiting for heartbeat : " + + System.currentTimeMillis()); + ResourceTrackerForLabels.class.wait(500l); + // to avoid race condition, i.e. sendOutofBandHeartBeat can be + // sent before NSU thread has gone to sleep, hence we wait and try + // to resend heartbeat again + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + ResourceTrackerForLabels.class.wait(500l); + i--; + } catch (InterruptedException e) { + Assert.fail("Exception caught while waiting for Heartbeat"); + e.printStackTrace(); + } + } + } + } + if (!receivedNMHeartbeat) { + Assert.fail("Heartbeat dint receive even after waiting"); + } + } + + public void waitTillRegister() { + if (receivedNMRegister) { + return; + } + while (!receivedNMRegister) { + synchronized (ResourceTrackerForLabels.class) { + try { + ResourceTrackerForLabels.class.wait(); + } catch (InterruptedException e) { + Assert.fail("Exception caught while waiting for register"); + e.printStackTrace(); + } + } + } + } + + /** + * Flag to indicate received any + */ + public void resetNMHeartbeatReceiveFlag() { + synchronized (ResourceTrackerForLabels.class) { + receivedNMHeartbeat = false; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + System.out.println("RTS receive heartbeat : " + + System.currentTimeMillis()); + labels = request.getNodeLabels(); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartbeatID++); + + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartbeatID, + NodeAction.NORMAL, null, null, null, null, 1000L); + + // to ensure that heartbeats are sent only when required. + nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE); + nhResponse.setAreNodeLabelsAcceptedByRM(labels != null); + + synchronized (ResourceTrackerForLabels.class) { + receivedNMHeartbeat = true; + ResourceTrackerForLabels.class.notifyAll(); + } + return nhResponse; + } + } + + public static class DummyNodeLabelsProvider extends NodeLabelsProvider { + + @SuppressWarnings("unchecked") + private Set nodeLabels = Collections.EMPTY_SET; + + public DummyNodeLabelsProvider() { + super(DummyNodeLabelsProvider.class.getName()); + } + + @Override + public synchronized Set getNodeLabels() { + return nodeLabels; + } + + synchronized void setNodeLabels(Set nodeLabels) { + this.nodeLabels = nodeLabels; + } + } + + private YarnConfiguration createNMConfigForDistributeNodeLabels() { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, + YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE); + return conf; + } + + @Test + public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException, + IOException { + final ResourceTrackerForLabels resourceTracker = + new ResourceTrackerForLabels(); + nm = new NodeManager() { + @Override + protected NodeLabelsProvider createNodeLabelsProvider( + Configuration conf) throws IOException { + return dummyLabelsProviderRef; + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProvider labelsProvider) { + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, labelsProvider) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfigForDistributeNodeLabels(); + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertCollectionEquals(resourceTracker.labels, + dummyLabelsProviderRef.getNodeLabels()); + + resourceTracker.waitTillHeartbeat();// wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with updated labels + dummyLabelsProviderRef.setNodeLabels(toSet("P")); + + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertCollectionEquals(resourceTracker.labels, + dummyLabelsProviderRef.getNodeLabels()); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat without updating labels + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + resourceTracker.resetNMHeartbeatReceiveFlag(); + assertNull( + "If no change in labels then null should be sent as part of request", + resourceTracker.labels); + + // provider return with null labels + dummyLabelsProviderRef.setNodeLabels(null); + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartbeat(); + assertTrue("If provider sends null then empty labels should be sent", + resourceTracker.labels.isEmpty()); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + nm.stop(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 0de556b..22efe25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -31,6 +34,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -100,6 +104,8 @@ public class ResourceTrackerService extends AbstractService implements private int minAllocMb; private int minAllocVcores; + private boolean isDistributesNodeLabelsConf; + static { resync.setNodeAction(NodeAction.RESYNC); @@ -149,6 +155,14 @@ public class ResourceTrackerService extends AbstractService implements YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); + String nodeLabelConfigurationType = + conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, + YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE); + + isDistributesNodeLabelsConf = + YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE + .equals(nodeLabelConfigurationType); + super.serviceInit(conf); } @@ -336,11 +350,31 @@ public class ResourceTrackerService extends AbstractService implements } } - String message = - "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " - + httpPort + ") " + "registered with capability: " + capability - + ", assigned nodeId " + nodeId; - LOG.info(message); + // Update node's labels to RM's NodeLabelManager. + Set nodeLabels = request.getNodeLabels(); + if (isDistributesNodeLabelsConf && nodeLabels != null) { + try { + updateNodeLabelsFromNMReport(nodeLabels, nodeId); + response.setAreNodeLabelsAcceptedByRM(true); + } catch (IOException ex) { + // Ensure the exception is captured in the response + response.setDiagnosticsMessage(ex.getMessage()); + response.setAreNodeLabelsAcceptedByRM(false); + } + } + + StringBuilder message = new StringBuilder(); + message.append("NodeManager from node ").append(host).append("(cmPort: ") + .append(cmPort).append(" httpPort: "); + message.append(httpPort).append(") ") + .append("registered with capability: ").append(capability); + message.append(", assigned nodeId ").append(nodeId); + if (response.getAreNodeLabelsAcceptedByRM()) { + message.append(", node labels { ").append( + StringUtils.join(",", nodeLabels) + " } "); + } + + LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); response.setRMVersion(YarnVersionInfo.getVersion()); @@ -359,6 +393,7 @@ public class ResourceTrackerService extends AbstractService implements * 2. Check if it's a registered node * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode + * 5. Update node's labels if distributed Node Labels configuration is enabled */ NodeId nodeId = remoteNodeStatus.getNodeId(); @@ -428,9 +463,44 @@ public class ResourceTrackerService extends AbstractService implements remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + // 5. Update node's labels to RM's NodeLabelManager. + if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) { + try { + updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId); + nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true); + } catch (IOException ex) { + //ensure the error message is captured and sent across in response + nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage()); + nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false); + } + } + return nodeHeartBeatResponse; } + private void updateNodeLabelsFromNMReport(Set nodeLabels, + NodeId nodeId) throws IOException { + try { + Map> labelsUpdate = + new HashMap>(); + labelsUpdate.put(nodeId, nodeLabels); + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + if (LOG.isDebugEnabled()) { + LOG.debug("Node Labels {" + StringUtils.join(",", nodeLabels) + + "} from Node " + nodeId + " were Accepted from RM"); + } + } catch (IOException ex) { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append("Node Labels {") + .append(StringUtils.join(",", nodeLabels)) + .append("} reported from NM with ID ").append(nodeId) + .append(" was rejected from RM with exception message as : ") + .append(ex.getMessage()); + LOG.error(errorMessage, ex); + throw new IOException(errorMessage.toString(), ex); + } + } + private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) {