Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9E9E200BB0 for ; Sun, 30 Oct 2016 21:21:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B8B80160B05; Sun, 30 Oct 2016 20:21:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E5EA9160B04 for ; Sun, 30 Oct 2016 21:21:20 +0100 (CET) Received: (qmail 80862 invoked by uid 500); 30 Oct 2016 20:21:14 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 80842 invoked by uid 99); 30 Oct 2016 20:21:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 30 Oct 2016 20:21:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A77A8E2F01; Sun, 30 Oct 2016 20:21:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Sun, 30 Oct 2016 20:21:16 -0000 Message-Id: In-Reply-To: <9c0745f01f31436ab63f6b0e5de6a34f@git.apache.org> References: <9c0745f01f31436ab63f6b0e5de6a34f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] hadoop git commit: YARN-5799. Fix Opportunistic Allocation to set the correct value of Node Http Address. (asuresh) archived-at: Sun, 30 Oct 2016 20:21:22 -0000 YARN-5799. Fix Opportunistic Allocation to set the correct value of Node Http Address. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa3cab1e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa3cab1e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa3cab1e Branch: refs/heads/YARN-3368 Commit: aa3cab1eb29c56368d15882d7260a994e615e8d8 Parents: 1c8ab41 Author: Arun Suresh Authored: Sat Oct 29 02:03:57 2016 -0700 Committer: Arun Suresh Committed: Sat Oct 29 02:03:57 2016 -0700 ---------------------------------------------------------------------- .../DistributedSchedulingAllocateResponse.java | 6 +- ...RegisterDistributedSchedulingAMResponse.java | 6 +- .../server/api/protocolrecords/RemoteNode.java | 90 +++++++++++++ ...ributedSchedulingAllocateResponsePBImpl.java | 41 +++--- ...erDistributedSchedulingAMResponsePBImpl.java | 39 +++--- .../impl/pb/RemoteNodePBImpl.java | 135 +++++++++++++++++++ .../OpportunisticContainerAllocator.java | 24 ++-- .../OpportunisticContainerContext.java | 14 +- .../yarn_server_common_service_protos.proto | 9 +- .../yarn/server/nodemanager/NodeManager.java | 2 +- .../scheduler/DistributedScheduler.java | 3 +- .../scheduler/TestDistributedScheduler.java | 20 ++- ...pportunisticContainerAllocatorAMService.java | 30 ++++- ...pportunisticContainerAllocatorAMService.java | 6 +- 14 files changed, 343 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java index 7a40449..edc0cf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulingAllocateResponse.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; import java.util.List; @@ -58,9 +57,10 @@ public abstract class DistributedSchedulingAllocateResponse { @Public @Unstable - public abstract void setNodesForScheduling(List nodesForScheduling); + public abstract void setNodesForScheduling( + List nodesForScheduling); @Public @Unstable - public abstract List getNodesForScheduling(); + public abstract List getNodesForScheduling(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java index a0a0e38..f7d8df2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterDistributedSchedulingAMResponse.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -99,10 +98,11 @@ public abstract class RegisterDistributedSchedulingAMResponse { @Public @Unstable - public abstract void setNodesForScheduling(List nodesForScheduling); + public abstract void setNodesForScheduling( + List nodesForScheduling); @Public @Unstable - public abstract List getNodesForScheduling(); + public abstract List getNodesForScheduling(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java new file mode 100644 index 0000000..2b76257 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.util.Records; + +/** + * This class is used to encapsulate the {@link NodeId} as well as the HTTP + * address that can be used to communicate with the Node. + */ +@Private +@Unstable +public abstract class RemoteNode implements Comparable { + + /** + * Create new Instance. + * @param nodeId NodeId. + * @param httpAddress Http address. + * @return RemoteNode instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + return remoteNode; + } + + /** + * Get {@link NodeId}. + * @return NodeId. + */ + @Private + @Unstable + public abstract NodeId getNodeId(); + + /** + * Set {@link NodeId}. + * @param nodeId NodeId. + */ + @Private + @Unstable + public abstract void setNodeId(NodeId nodeId); + + /** + * Get HTTP address. + * @return Http Address. + */ + @Private + @Unstable + public abstract String getHttpAddress(); + + /** + * Set HTTP address. + * @param httpAddress HTTP address. + */ + @Private + @Unstable + public abstract void setHttpAddress(String httpAddress); + + /** + * Use the underlying {@link NodeId} comparator. + * @param other RemoteNode. + * @return Comparison. + */ + @Override + public int compareTo(RemoteNode other) { + return this.getNodeId().compareTo(other.getNodeId()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java index 18d5073..8c48b61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulingAllocateResponsePBImpl.java @@ -21,12 +21,13 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -45,7 +46,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends boolean viaProto = false; private AllocateResponse allocateResponse; - private List nodesForScheduling; + private List nodesForScheduling; public DistributedSchedulingAllocateResponsePBImpl() { builder = YarnServerCommonServiceProtos. @@ -86,8 +87,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends private synchronized void mergeLocalToBuilder() { if (this.nodesForScheduling != null) { builder.clearNodesForScheduling(); - Iterable iterable = getNodeIdProtoIterable( - this.nodesForScheduling); + Iterable iterable = + getNodeIdProtoIterable(this.nodesForScheduling); builder.addAllNodesForScheduling(iterable); } if (this.allocateResponse != null) { @@ -123,7 +124,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends } @Override - public void setNodesForScheduling(List nodesForScheduling) { + public void setNodesForScheduling(List nodesForScheduling) { maybeInitBuilder(); if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { if (this.nodesForScheduling != null) { @@ -137,7 +138,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends } @Override - public List getNodesForScheduling() { + public List getNodesForScheduling() { if (nodesForScheduling != null) { return nodesForScheduling; } @@ -149,24 +150,25 @@ public class DistributedSchedulingAllocateResponsePBImpl extends YarnServerCommonServiceProtos. DistributedSchedulingAllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNodesForSchedulingList(); + List list = + p.getNodesForSchedulingList(); nodesForScheduling = new ArrayList<>(); if (list != null) { - for (YarnProtos.NodeIdProto t : list) { - nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) { + nodesForScheduling.add(new RemoteNodePBImpl(t)); } } } - private synchronized Iterable getNodeIdProtoIterable( - final List nodeList) { + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { maybeInitBuilder(); - return new Iterable() { + return new Iterable() { @Override - public synchronized Iterator iterator() { - return new Iterator() { + public synchronized Iterator iterator() { + return new Iterator() { - Iterator iter = nodeList.iterator(); + Iterator iter = nodeList.iterator(); @Override public boolean hasNext() { @@ -174,8 +176,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends } @Override - public YarnProtos.NodeIdProto next() { - return ProtoUtils.convertToProtoFormat(iter.next()); + public RemoteNodeProto next() { + return ((RemoteNodePBImpl)iter.next()).getProto(); } @Override @@ -186,5 +188,4 @@ public class DistributedSchedulingAllocateResponsePBImpl extends } }; } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java index 4aaf99c..41b2a4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterDistributedSchedulingAMResponsePBImpl.java @@ -23,13 +23,15 @@ import com.google.protobuf.TextFormat; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -52,7 +54,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends private Resource maxContainerResource; private Resource minContainerResource; private Resource incrContainerResource; - private List nodesForScheduling; + private List nodesForScheduling; private RegisterApplicationMasterResponse registerApplicationMasterResponse; public RegisterDistributedSchedulingAMResponsePBImpl() { @@ -95,8 +97,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends private synchronized void mergeLocalToBuilder() { if (this.nodesForScheduling != null) { builder.clearNodesForScheduling(); - Iterable iterable = getNodeIdProtoIterable( - this.nodesForScheduling); + Iterable iterable = + getNodeIdProtoIterable(this.nodesForScheduling); builder.addAllNodesForScheduling(iterable); } if (this.maxContainerResource != null) { @@ -261,7 +263,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends } @Override - public void setNodesForScheduling(List nodesForScheduling) { + public void setNodesForScheduling(List nodesForScheduling) { maybeInitBuilder(); if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { if (this.nodesForScheduling != null) { @@ -275,7 +277,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends } @Override - public List getNodesForScheduling() { + public List getNodesForScheduling() { if (nodesForScheduling != null) { return nodesForScheduling; } @@ -287,24 +289,25 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends YarnServerCommonServiceProtos. RegisterDistributedSchedulingAMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNodesForSchedulingList(); + List list = + p.getNodesForSchedulingList(); nodesForScheduling = new ArrayList<>(); if (list != null) { - for (YarnProtos.NodeIdProto t : list) { - nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) { + nodesForScheduling.add(new RemoteNodePBImpl(t)); } } } - private synchronized Iterable getNodeIdProtoIterable( - final List nodeList) { + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { maybeInitBuilder(); - return new Iterable() { + return new Iterable() { @Override - public synchronized Iterator iterator() { - return new Iterator() { + public synchronized Iterator iterator() { + return new Iterator() { - Iterator iter = nodeList.iterator(); + Iterator iter = nodeList.iterator(); @Override public boolean hasNext() { @@ -312,8 +315,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends } @Override - public YarnProtos.NodeIdProto next() { - return ProtoUtils.convertToProtoFormat(iter.next()); + public RemoteNodeProto next() { + return ((RemoteNodePBImpl)iter.next()).getProto(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java new file mode 100644 index 0000000..3e4fd4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; + +/** + * Implementation of {@link RemoteNode}. + */ +public class RemoteNodePBImpl extends RemoteNode { + + private RemoteNodeProto proto = RemoteNodeProto.getDefaultInstance(); + private RemoteNodeProto.Builder builder = null; + private boolean viaProto = false; + + private NodeId nodeId = null; + + public RemoteNodePBImpl() { + builder = RemoteNodeProto.newBuilder(); + } + + public RemoteNodePBImpl(RemoteNodeProto proto) { + this.proto = proto; + viaProto = true; + } + + public RemoteNodeProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.nodeId != null + && !((NodeIdPBImpl) nodeId).getProto().equals( + builder.getNodeId())) { + builder.setNodeId(ProtoUtils.convertToProtoFormat(this.nodeId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RemoteNodeProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public NodeId getNodeId() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeId != null) { + return this.nodeId; + } + if (!p.hasNodeId()) { + return null; + } + this.nodeId = ProtoUtils.convertFromProtoFormat(p.getNodeId()); + return this.nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + maybeInitBuilder(); + if (nodeId == null) { + builder.clearNodeId(); + } + this.nodeId = nodeId; + } + + @Override + public String getHttpAddress() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasHttpAddress()) { + return null; + } + return (p.getHttpAddress()); + } + + @Override + public void setHttpAddress(String httpAddress) { + maybeInitBuilder(); + if (httpAddress == null) { + builder.clearHttpAddress(); + return; + } + builder.setHttpAddress(httpAddress); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 9c158e9..4410db1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -174,17 +175,14 @@ public class OpportunisticContainerAllocator { new DominantResourceCalculator(); private final BaseContainerTokenSecretManager tokenSecretManager; - private int webpagePort; /** * Create a new Opportunistic Container Allocator. * @param tokenSecretManager TokenSecretManager - * @param webpagePort Webpage Port */ public OpportunisticContainerAllocator( - BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) { + BaseContainerTokenSecretManager tokenSecretManager) { this.tokenSecretManager = tokenSecretManager; - this.webpagePort = webpagePort; } /** @@ -271,15 +269,15 @@ public class OpportunisticContainerAllocator { private void allocateContainersInternal(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, + Map allNodes, String userName, Map> containers, ResourceRequest anyAsk) throws YarnException { int toAllocate = anyAsk.getNumContainers() - (containers.isEmpty() ? 0 : containers.get(anyAsk.getCapability()).size()); - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { + List nodesForScheduling = new ArrayList<>(); + for (Entry nodeEntry : allNodes.entrySet()) { // Do not use blacklisted nodes for scheduling. if (blacklist.contains(nodeEntry.getKey())) { continue; @@ -295,9 +293,9 @@ public class OpportunisticContainerAllocator { for (int numCont = 0; numCont < toAllocate; numCont++) { nextNodeToSchedule++; nextNodeToSchedule %= nodesForScheduling.size(); - NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); + RemoteNode node = nodesForScheduling.get(nextNodeToSchedule); Container container = buildContainer(rmIdentifier, appParams, idCounter, - anyAsk, id, userName, nodeId); + anyAsk, id, userName, node); List cList = containers.get(anyAsk.getCapability()); if (cList == null) { cList = new ArrayList<>(); @@ -313,7 +311,7 @@ public class OpportunisticContainerAllocator { private Container buildContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ResourceRequest rr, ApplicationAttemptId id, String userName, - NodeId nodeId) throws YarnException { + RemoteNode node) throws YarnException { ContainerId cId = ContainerId.newContainerId(id, idCounter.generateContainerId()); @@ -324,7 +322,7 @@ public class OpportunisticContainerAllocator { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( - cId, 0, nodeId.getHost() + ":" + nodeId.getPort(), userName, + cId, 0, node.getNodeId().toString(), userName, capability, currTime + appParams.containerTokenExpiryInterval, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, rr.getPriority(), currTime, @@ -332,10 +330,10 @@ public class OpportunisticContainerAllocator { ExecutionType.OPPORTUNISTIC); byte[] pwd = tokenSecretManager.createPassword(containerTokenIdentifier); - Token containerToken = newContainerToken(nodeId, pwd, + Token containerToken = newContainerToken(node.getNodeId(), pwd, containerTokenIdentifier); Container container = BuilderUtils.newContainer( - cId, nodeId, nodeId.getHost() + ":" + webpagePort, + cId, node.getNodeId(), node.getHttpAddress(), capability, rr.getPriority(), containerToken, containerTokenIdentifier.getExecutionType(), rr.getAllocationRequestId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 6fcddf8..725e2d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -23,10 +23,10 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +60,8 @@ public class OpportunisticContainerContext { private ContainerIdGenerator containerIdGenerator = new ContainerIdGenerator(); - private volatile List nodeList = new LinkedList<>(); - private final Map nodeMap = new LinkedHashMap<>(); + private volatile List nodeList = new LinkedList<>(); + private final Map nodeMap = new LinkedHashMap<>(); private final Set blacklist = new HashSet<>(); @@ -89,11 +89,11 @@ public class OpportunisticContainerContext { this.containerIdGenerator = containerIdGenerator; } - public Map getNodeMap() { + public Map getNodeMap() { return Collections.unmodifiableMap(nodeMap); } - public synchronized void updateNodeList(List newNodeList) { + public synchronized void updateNodeList(List newNodeList) { // This is an optimization for centralized placement. The // OppContainerAllocatorAMService has a cached list of nodes which it sets // here. The nodeMap needs to be updated only if the backing node list is @@ -101,8 +101,8 @@ public class OpportunisticContainerContext { if (newNodeList != nodeList) { nodeList = newNodeList; nodeMap.clear(); - for (NodeId n : nodeList) { - nodeMap.put(n.getHost(), n); + for (RemoteNode n : nodeList) { + nodeMap.put(n.getNodeId().getHost(), n); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d485e6b..4350fc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -26,6 +26,11 @@ import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; +message RemoteNodeProto { + optional NodeIdProto node_id = 1; + optional string http_address = 2; +} + message RegisterDistributedSchedulingAMResponseProto { optional RegisterApplicationMasterResponseProto register_response = 1; optional ResourceProto max_container_resource = 2; @@ -33,12 +38,12 @@ message RegisterDistributedSchedulingAMResponseProto { optional ResourceProto incr_container_resource = 4; optional int32 container_token_expiry_interval = 5; optional int64 container_id_start = 6; - repeated NodeIdProto nodes_for_scheduling = 7; + repeated RemoteNodeProto nodes_for_scheduling = 7; } message DistributedSchedulingAllocateResponseProto { optional AllocateResponseProto allocate_response = 1; - repeated NodeIdProto nodes_for_scheduling = 2; + repeated RemoteNodeProto nodes_for_scheduling = 2; } message DistributedSchedulingAllocateRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5424464..0f0a081 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -374,7 +374,7 @@ public class NodeManager extends CompositeService ((NMContext) context).setQueueableContainerAllocator( new OpportunisticContainerAllocator( - context.getContainerTokenSecretManager(), webServer.getPort())); + context.getContainerTokenSecretManager())); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 8a40337..a12d16a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -198,7 +199,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { setNodeList(registerResponse.getNodesForScheduling()); } - private void setNodeList(List nodeList) { + private void setNodeList(List nodeList) { oppContainerContext.updateNodeList(nodeList); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index 8f1ae7f..736dc31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; @@ -74,7 +75,8 @@ public class TestDistributedScheduler { RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler); registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList( - NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); + RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"), + RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2"))); final AtomicBoolean flipFlag = new AtomicBoolean(true); Mockito.when( @@ -87,10 +89,16 @@ public class TestDistributedScheduler { flipFlag.set(!flipFlag.get()); if (flipFlag.get()) { return createAllocateResponse(Arrays.asList( - NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + RemoteNode.newInstance( + NodeId.newInstance("c", 3), "http://c:3"), + RemoteNode.newInstance( + NodeId.newInstance("d", 4), "http://d:4"))); } else { return createAllocateResponse(Arrays.asList( - NodeId.newInstance("d", 4), NodeId.newInstance("c", 3))); + RemoteNode.newInstance( + NodeId.newInstance("d", 4), "http://d:4"), + RemoteNode.newInstance( + NodeId.newInstance("c", 3), "http://c:3"))); } } }); @@ -164,7 +172,7 @@ public class TestDistributedScheduler { } private void registerAM(DistributedScheduler distributedScheduler, - RequestInterceptor finalReqIntcptr, List nodeList) + RequestInterceptor finalReqIntcptr, List nodeList) throws Exception { RegisterDistributedSchedulingAMResponse distSchedRegisterResponse = Records.newRecord(RegisterDistributedSchedulingAMResponse.class); @@ -208,7 +216,7 @@ public class TestDistributedScheduler { }; nmContainerTokenSecretManager.setMasterKey(mKey); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77); + new OpportunisticContainerAllocator(nmContainerTokenSecretManager); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); @@ -236,7 +244,7 @@ public class TestDistributedScheduler { } private DistributedSchedulingAllocateResponse createAllocateResponse( - List nodes) { + List nodes) { DistributedSchedulingAllocateResponse distSchedAllocateResponse = Records.newRecord(DistributedSchedulingAllocateResponse.class); distSchedAllocateResponse http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a7c0a50..815d29d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -74,6 +76,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; /** @@ -97,7 +100,7 @@ public class OpportunisticContainerAllocatorAMService private final int k; private final long cacheRefreshInterval; - private List cachedNodeIds; + private List cachedNodes; private long lastCacheUpdateTime; public OpportunisticContainerAllocatorAMService(RMContext rmContext, @@ -105,7 +108,7 @@ public class OpportunisticContainerAllocatorAMService super(OpportunisticContainerAllocatorAMService.class.getName(), rmContext, scheduler); this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager(), 0); + rmContext.getContainerTokenSecretManager()); this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT); @@ -372,14 +375,29 @@ public class OpportunisticContainerAllocatorAMService ); } - private synchronized List getLeastLoadedNodes() { + private synchronized List getLeastLoadedNodes() { long currTime = System.currentTimeMillis(); if ((currTime - lastCacheUpdateTime > cacheRefreshInterval) - || cachedNodeIds == null) { - cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k); + || cachedNodes == null) { + cachedNodes = convertToRemoteNodes( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); lastCacheUpdateTime = currTime; } - return cachedNodeIds; + return cachedNodes; + } + + private List convertToRemoteNodes(List nodeIds) { + ArrayList retNodes = new ArrayList<>(); + for (NodeId nId : nodeIds) { + retNodes.add(convertToRemoteNode(nId)); + } + return retNodes; + } + + private RemoteNode convertToRemoteNode(NodeId nodeId) { + return RemoteNode.newInstance(nodeId, + ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId) + .getHttpAddress()); } private Resource createMaxContainerResource() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa3cab1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 207f5ba..3154d26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; @@ -190,7 +191,7 @@ public class TestOpportunisticContainerAllocatorAMService { dsProxy.allocateForDistributedScheduling(null, distAllReq.getProto())); Assert.assertEquals( - "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); + "h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost()); FinishApplicationMasterResponse dsfinishResp = new FinishApplicationMasterResponsePBImpl( @@ -269,7 +270,8 @@ public class TestOpportunisticContainerAllocatorAMService { DistributedSchedulingAllocateResponse resp = factory .newRecordInstance(DistributedSchedulingAllocateResponse.class); resp.setNodesForScheduling( - Arrays.asList(NodeId.newInstance("h1", 1234))); + Arrays.asList(RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "http://h1:4321"))); return resp; } }; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org