Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 12B95200CF3 for ; Wed, 30 Aug 2017 08:13:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 112F116868B; Wed, 30 Aug 2017 06:13:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 98305168685 for ; Wed, 30 Aug 2017 08:13:32 +0200 (CEST) Received: (qmail 41606 invoked by uid 500); 30 Aug 2017 06:13:23 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 39549 invoked by uid 99); 30 Aug 2017 06:13:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 06:13:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E8981F5FE6; Wed, 30 Aug 2017 06:13:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Wed, 30 Aug 2017 06:13:45 -0000 Message-Id: <3a194b67d92f48e982af41a0ec37acdf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/50] [abbrv] hadoop git commit: YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena. archived-at: Wed, 30 Aug 2017 06:13:35 -0000 YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7594d1de Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7594d1de Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7594d1de Branch: refs/heads/trunk Commit: 7594d1de7bbc34cd2e64202095a5e1757154d7d0 Parents: 9f65405 Author: Rohith Sharma K S Authored: Mon Jul 31 17:26:34 2017 +0530 Committer: Varun Saxena Committed: Wed Aug 30 11:29:53 2017 +0530 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerAllocator.java | 9 +- .../api/protocolrecords/AllocateResponse.java | 32 ++-- .../hadoop/yarn/api/records/CollectorInfo.java | 55 +++++++ .../src/main/proto/yarn_protos.proto | 5 + .../src/main/proto/yarn_service_protos.proto | 2 +- .../api/async/impl/AMRMClientAsyncImpl.java | 6 +- .../ApplicationMasterServiceProtoTestBase.java | 72 +++++++++ .../hadoop/yarn/client/ProtocolHATestBase.java | 20 ++- ...ationMasterServiceProtocolForTimelineV2.java | 71 +++++++++ ...estApplicationMasterServiceProtocolOnHA.java | 46 +----- .../api/async/impl/TestAMRMClientAsync.java | 2 +- .../impl/pb/AllocateResponsePBImpl.java | 37 ++++- .../records/impl/pb/CollectorInfoPBImpl.java | 148 +++++++++++++++++++ .../hadoop/yarn/api/TestPBImplRecords.java | 2 + .../ReportNewCollectorInfoRequest.java | 5 +- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 25 +++- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 21 ++- .../pb/ReportNewCollectorInfoRequestPBImpl.java | 4 +- .../server/api/records/AppCollectorData.java | 27 +++- .../records/impl/pb/AppCollectorDataPBImpl.java | 29 +++- .../yarn_server_common_service_protos.proto | 2 + .../java/org/apache/hadoop/yarn/TestRPC.java | 30 +++- .../hadoop/yarn/TestYarnServerApiClasses.java | 4 +- .../nodemanager/NodeStatusUpdaterImpl.java | 1 - .../application/ApplicationImpl.java | 1 - .../ApplicationMasterService.java | 2 - .../resourcemanager/DefaultAMSProcessor.java | 8 +- .../server/resourcemanager/rmapp/RMApp.java | 15 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 10 +- .../applicationsmanager/MockAsm.java | 6 + .../server/resourcemanager/rmapp/MockRMApp.java | 6 + .../TestTimelineServiceClientIntegration.java | 2 +- .../security/TestTimelineAuthFilterForV2.java | 121 +++++++++++---- .../collector/AppLevelTimelineCollector.java | 24 +++ .../AppLevelTimelineCollectorWithAgg.java | 4 +- .../collector/NodeTimelineCollectorManager.java | 83 +++++++++-- .../PerNodeTimelineCollectorsAuxService.java | 7 +- ...neV2DelegationTokenSecretManagerService.java | 31 ++++ .../TestNMTimelineCollectorManager.java | 4 +- 39 files changed, 829 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 0952797..969ec4c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -878,13 +878,16 @@ public class RMContainerAllocator extends RMContainerRequestor handleUpdatedNodes(response); handleJobPriorityChange(response); // handle receiving the timeline collector address for this app - String collectorAddr = response.getCollectorAddr(); + String collectorAddr = null; + if (response.getCollectorInfo() != null) { + collectorAddr = response.getCollectorInfo().getCollectorAddr(); + } + MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); if (collectorAddr != null && !collectorAddr.isEmpty() && appContext.getTimelineV2Client() != null) { - appContext.getTimelineV2Client().setTimelineServiceAddress( - response.getCollectorAddr()); + appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr); } for (ContainerStatus cont : finishedContainers) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index d3ca765..9b254ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -92,21 +93,21 @@ public abstract class AllocateResponse { .preemptionMessage(preempt).nmTokens(nmTokens).build(); } - @Public + @Private @Unstable public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, - List updatedContainers) { + CollectorInfo collectorInfo) { return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) .responseId(responseId) .completedContainersStatuses(completedContainers) .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) .availableResources(availResources).amCommand(command) .preemptionMessage(preempt).nmTokens(nmTokens) - .updatedContainers(updatedContainers).build(); + .collectorInfo(collectorInfo).build(); } @Private @@ -133,7 +134,7 @@ public abstract class AllocateResponse { List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, - List updatedContainers, String collectorAddr) { + List updatedContainers, CollectorInfo collectorInfo) { return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) .responseId(responseId) .completedContainersStatuses(completedContainers) @@ -141,7 +142,7 @@ public abstract class AllocateResponse { .availableResources(availResources).amCommand(command) .preemptionMessage(preempt).nmTokens(nmTokens) .updatedContainers(updatedContainers).amRmToken(amRMToken) - .collectorAddr(collectorAddr).build(); + .collectorInfo(collectorInfo).build(); } /** @@ -333,17 +334,18 @@ public abstract class AllocateResponse { public abstract void setApplicationPriority(Priority priority); /** - * The address of collector that belong to this app + * The data associated with the collector that belongs to this app. Contains + * address and token alongwith identification information. * - * @return The address of collector that belong to this attempt + * @return The data of collector that belong to this attempt */ @Public @Unstable - public abstract String getCollectorAddr(); + public abstract CollectorInfo getCollectorInfo(); @Private @Unstable - public abstract void setCollectorAddr(String collectorAddr); + public abstract void setCollectorInfo(CollectorInfo info); /** * Get the list of container update errors to inform the @@ -559,15 +561,17 @@ public abstract class AllocateResponse { } /** - * Set the collectorAddr of the response. - * @see AllocateResponse#setCollectorAddr(String) - * @param collectorAddr collectorAddr of the response + * Set the collectorInfo of the response. + * @see AllocateResponse#setCollectorInfo(CollectorInfo) + * @param collectorInfo collectorInfo of the response which + * contains collector address, RM id, version and collector token. * @return {@link AllocateResponseBuilder} */ @Private @Unstable - public AllocateResponseBuilder collectorAddr(String collectorAddr) { - allocateResponse.setCollectorAddr(collectorAddr); + public AllocateResponseBuilder collectorInfo( + CollectorInfo collectorInfo) { + allocateResponse.setCollectorInfo(collectorInfo); return this; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java new file mode 100644 index 0000000..960c992 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +/** + * Collector info containing collector address and collector token passed from + * RM to AM in Allocate Response. + */ +@Private +@InterfaceStability.Unstable +public abstract class CollectorInfo { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static CollectorInfo newInstance(String collectorAddr, Token token) { + CollectorInfo amCollectorInfo = + Records.newRecord(CollectorInfo.class); + amCollectorInfo.setCollectorAddr(collectorAddr); + amCollectorInfo.setCollectorToken(token); + return amCollectorInfo; + } + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 81ebd79..c5f485f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -613,3 +613,8 @@ message StringBytesMapProto { optional string key = 1; optional bytes value = 2; } + +message CollectorInfoProto { + optional string collector_addr = 1; + optional hadoop.common.TokenProto collector_token = 2; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index b92c46e..7a7f035 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -112,7 +112,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; - optional string collector_addr = 14; + optional CollectorInfoProto collector_info = 14; repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 6711da2..265badb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -325,7 +325,11 @@ extends AMRMClientAsync { } AllocateResponse response = (AllocateResponse) object; - String collectorAddress = response.getCollectorAddr(); + String collectorAddress = null; + if (response.getCollectorInfo() != null) { + collectorAddress = response.getCollectorInfo().getCollectorAddr(); + } + TimelineV2Client timelineClient = client.getRegisteredTimelineV2Client(); if (timelineClient != null && collectorAddress != null http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java new file mode 100644 index 0000000..4521018 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java @@ -0,0 +1,72 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; + +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; + +/** + * Test Base for Application Master Service Protocol. + */ +public abstract class ApplicationMasterServiceProtoTestBase + extends ProtocolHATestBase { + + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId; + + protected void startupHAAndSetupClient() throws Exception { + attemptId = this.cluster.createFakeApplicationAttemptId(); + + Token appToken = + this.cluster.getResourceManager().getRMContext() + .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); + appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + protected ApplicationMasterProtocol getAMClient() { + return amClient; + } + + private void syncToken(Token token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index a8e9132..f4005e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -804,11 +805,20 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { } public AllocateResponse createFakeAllocateResponse() { - return AllocateResponse.newInstance(-1, - new ArrayList(), - new ArrayList(), new ArrayList(), - Resource.newInstance(1024, 2), null, 1, - null, new ArrayList()); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + return AllocateResponse.newInstance(-1, + new ArrayList(), new ArrayList(), + new ArrayList(), Resource.newInstance(1024, 2), null, 1, + null, new ArrayList(), CollectorInfo.newInstance( + "host:port", Token.newInstance(new byte[] {0}, "TIMELINE", + new byte[] {0}, "rm"))); + } else { + return AllocateResponse.newInstance(-1, + new ArrayList(), + new ArrayList(), new ArrayList(), + Resource.newInstance(1024, 2), null, 1, + null, new ArrayList()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java new file mode 100644 index 0000000..be8c302 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java @@ -0,0 +1,71 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests Application Master Protocol with timeline service v2 enabled. + */ +public class TestApplicationMasterServiceProtocolForTimelineV2 + extends ApplicationMasterServiceProtoTestBase { + + @Before + public void initialize() throws Exception { + HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf); + HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + startHACluster(0, false, false, true); + super.startupHAAndSetupClient(); + } + + @Test(timeout = 15000) + public void testAllocateForTimelineV2OnHA() + throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList(), + new ArrayList(), + ResourceBlacklistRequest.newInstance(new ArrayList(), + new ArrayList())); + AllocateResponse response = getAMClient().allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + Assert.assertNotNull(response.getCollectorInfo()); + Assert.assertEquals("host:port", + response.getCollectorInfo().getCollectorAddr()); + Assert.assertNotNull(response.getCollectorInfo().getCollectorToken()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java index ad86fb3..c2f39a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java @@ -23,10 +23,6 @@ import java.util.ArrayList; import org.junit.Assert; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestApplicationMasterServiceProtocolOnHA - extends ProtocolHATestBase { - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - + extends ApplicationMasterServiceProtoTestBase { @Before public void initialize() throws Exception { startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - - Token appToken = - this.cluster.getResourceManager().getRMContext() - .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } + super.startupHAAndSetupClient(); } @Test(timeout = 15000) @@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); RegisterApplicationMasterResponse response = - amClient.registerApplicationMaster(request); + getAMClient().registerApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeRegisterApplicationMasterResponse()); } @@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); FinishApplicationMasterResponse response = - amClient.finishApplicationMaster(request); + getAMClient().finishApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeFinishApplicationMasterResponse()); } @@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA new ArrayList(), ResourceBlacklistRequest.newInstance(new ArrayList(), new ArrayList())); - AllocateResponse response = amClient.allocate(request); + AllocateResponse response = getAMClient().allocate(request); Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); } - - private void syncToken(Token token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index ba38340..56826c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -426,7 +426,7 @@ public class TestAMRMClientAsync { } AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null, nmTokens, + new ArrayList(), null, null, 1, null, nmTokens, null, updatedContainers); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 1455d6a..ff35da8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -80,6 +83,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { private PreemptionMessage preempt; private Token amrmToken = null; private Priority appPriority = null; + private CollectorInfo collectorInfo = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -162,6 +166,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); } + if (this.collectorInfo != null) { + builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo)); + } if (this.appPriority != null) { builder.setApplicationPriority(convertToProtoFormat(this.appPriority)); } @@ -398,19 +405,25 @@ public class AllocateResponsePBImpl extends AllocateResponse { @Override - public synchronized String getCollectorAddr() { + public synchronized CollectorInfo getCollectorInfo() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - return p.getCollectorAddr(); + if (this.collectorInfo != null) { + return this.collectorInfo; + } + if (!p.hasCollectorInfo()) { + return null; + } + this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo()); + return this.collectorInfo; } @Override - public synchronized void setCollectorAddr(String collectorAddr) { + public synchronized void setCollectorInfo(CollectorInfo info) { maybeInitBuilder(); - if (collectorAddr == null) { - builder.clearCollectorAddr(); - return; + if (info == null) { + builder.clearCollectorInfo(); } - builder.setCollectorAddr(collectorAddr); + this.collectorInfo = info; } @Override @@ -718,6 +731,16 @@ public class AllocateResponsePBImpl extends AllocateResponse { return ((NodeReportPBImpl)t).getProto(); } + private synchronized CollectorInfoPBImpl convertFromProtoFormat( + CollectorInfoProto p) { + return new CollectorInfoPBImpl(p); + } + + private synchronized CollectorInfoProto convertToProtoFormat( + CollectorInfo t) { + return ((CollectorInfoPBImpl)t).getProto(); + } + private synchronized ContainerPBImpl convertFromProtoFormat( ContainerProto p) { return new ContainerPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java new file mode 100644 index 0000000..bb54133 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java @@ -0,0 +1,148 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +/** + * Protocol record implementation of {@link CollectorInfo}. + */ +public class CollectorInfoPBImpl extends CollectorInfo { + + private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance(); + + private CollectorInfoProto.Builder builder = null; + private boolean viaProto = false; + + private String collectorAddr = null; + private Token collectorToken = null; + + + public CollectorInfoPBImpl() { + builder = CollectorInfoProto.newBuilder(); + } + + public CollectorInfoPBImpl(CollectorInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public CollectorInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CollectorInfoProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public String getCollectorAddr() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null && p.hasCollectorAddr()) { + this.collectorAddr = p.getCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setCollectorAddr(String addr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearCollectorAddr(); + } + this.collectorAddr = addr; + } + + @Override + public Token getCollectorToken() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasCollectorToken()) { + this.collectorToken = convertFromProtoFormat(p.getCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearCollectorToken(); + } + this.collectorToken = token; + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private void mergeLocalToBuilder() { + if (this.collectorAddr != null) { + builder.setCollectorAddr(this.collectorAddr); + } + if (this.collectorToken != null) { + builder.setCollectorToken(convertToProtoFormat(this.collectorToken)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index bb688c9..a3f5491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -406,6 +407,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(CommitResponse.class); generateByNewInstance(ApplicationTimeout.class); generateByNewInstance(QueueConfigurations.class); + generateByNewInstance(CollectorInfo.class); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 1503eca..a4c1a38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest { } public static ReportNewCollectorInfoRequest newInstance( - ApplicationId id, String collectorAddr) { + ApplicationId id, String collectorAddr, Token token) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token))); return request; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 73a8abe..c07a6eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -26,23 +26,26 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { builder.clearRegisteringCollectors(); for (Map.Entry entry : registeringCollectors.entrySet()) { + AppCollectorData data = entry.getValue(); builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue().getCollectorAddr())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken())) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { this.registeringCollectors = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.registeringCollectors.put(appId, data); } } @@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((MasterKeyPBImpl)t).getProto(); } + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + @Override public Set getNodeLabels() { initNodeLabels(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index ad81a1f..a1e6a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,31 +26,34 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -154,6 +157,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken( + convertToProtoFormat(entry.getValue().getCollectorToken())) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion())); } @@ -599,8 +604,10 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.appCollectorsMap.put(appId, data); } } @@ -780,5 +787,13 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { SignalContainerRequest t) { return ((SignalContainerRequestPBImpl)t).getProto(); } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index 3f3dcf5..5ffc3a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java index da2e5de..5266dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; @@ -31,20 +32,32 @@ public abstract class AppCollectorData { protected static final long DEFAULT_TIMESTAMP_VALUE = -1; public static AppCollectorData newInstance( - ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + ApplicationId id, String collectorAddr, long rmIdentifier, long version, + Token token) { AppCollectorData appCollectorData = Records.newRecord(AppCollectorData.class); appCollectorData.setApplicationId(id); appCollectorData.setCollectorAddr(collectorAddr); appCollectorData.setRMIdentifier(rmIdentifier); appCollectorData.setVersion(version); + appCollectorData.setCollectorToken(token); return appCollectorData; } + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + return newInstance(id, collectorAddr, rmIdentifier, version, null); + } + public static AppCollectorData newInstance(ApplicationId id, - String collectorAddr) { + String collectorAddr, Token token) { return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, - DEFAULT_TIMESTAMP_VALUE); + DEFAULT_TIMESTAMP_VALUE, token); + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr) { + return newInstance(id, collectorAddr, null); } /** @@ -101,4 +114,12 @@ public abstract class AppCollectorData { public abstract void setVersion(long version); + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java index 7d3a805..7144f51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; - +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; @@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData { private String collectorAddr = null; private Long rmIdentifier = null; private Long version = null; + private Token collectorToken = null; public AppCollectorDataPBImpl() { builder = AppCollectorDataProto.newBuilder(); @@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData { builder.setVersion(version); } + @Override + public Token getCollectorToken() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasAppCollectorToken()) { + this.collectorToken = new TokenPBImpl(p.getAppCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearAppCollectorToken(); + } + this.collectorToken = token; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData { if (this.version != null) { builder.setVersion(this.version); } + if (this.collectorToken != null) { + builder.setAppCollectorToken( + ((TokenPBImpl)this.collectorToken).getProto()); + } } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c33b913..c2ba677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -22,6 +22,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.yarn; +import "Security.proto"; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; @@ -139,6 +140,7 @@ message AppCollectorDataProto { optional string app_collector_addr = 2; optional int64 rm_identifier = 3 [default = -1]; optional int64 version = 4 [default = -1]; + optional hadoop.common.TokenProto app_collector_token = 5; } ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 8291197..82dfaea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; @@ -93,6 +95,21 @@ public class TestRPC { "collectors' number in ReportNewCollectorInfoRequest is not ONE."; public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0"; + private static final Token DEFAULT_COLLECTOR_TOKEN; + static { + TimelineDelegationTokenIdentifier identifier = + new TimelineDelegationTokenIdentifier(); + identifier.setOwner(new Text("user")); + identifier.setRenewer(new Text("user")); + identifier.setRealUser(new Text("user")); + long now = Time.now(); + identifier.setIssueDate(now); + identifier.setMaxDate(now + 1000L); + identifier.setMasterKeyId(500); + identifier.setSequenceNumber(5); + DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(), + identifier.getKind().toString(), identifier.getBytes(), "localhost:0"); + } public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, 0); @@ -173,7 +190,16 @@ public class TestRPC { try { ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance( - DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR); + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null); + proxy.reportNewCollectorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + try { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN); proxy.reportNewCollectorInfo(request); } catch (YarnException e) { Assert.fail("RPC call failured is not expected here."); @@ -437,6 +463,8 @@ public class TestRPC { DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), DEFAULT_COLLECTOR_ADDR); + Assert.assertTrue(appCollector.getCollectorToken() == null || + appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN)); } else { throw new YarnException(ILLEGAL_NUMBER_MESSAGE); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 15071f3..a1890dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -351,7 +352,8 @@ public class TestYarnServerApiClasses { private Map getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr, + Token.newInstance(new byte[0], "kind", new byte[0], "s")); Map collectorMap = new HashMap<>(); collectorMap.put(appID, data); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 43cd135..35b7cb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; - import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 0097cd2..39be7a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a245a97..479aa43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -45,8 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - - import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 2d677a7..a993d69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; @@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt @@ -294,9 +294,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled( getRmContext().getYarnConfiguration())) { - AppCollectorData data = app.getCollectorData(); - if (data != null) { - response.setCollectorAddr(data.getCollectorAddr()); + CollectorInfo collectorInfo = app.getCollectorInfo(); + if (collectorInfo != null) { + response.setCollectorInfo(collectorInfo); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 1a0b920..93c41b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -187,14 +188,24 @@ public interface RMApp extends EventHandler { * only if the timeline service v.2 is enabled. * * @return the data for the application's collector, including collector - * address, collector ID. Return null if the timeline service v.2 is not - * enabled. + * address, RM ID, version and collector token. Return null if the timeline + * service v.2 is not enabled. */ @InterfaceAudience.Private @InterfaceStability.Unstable AppCollectorData getCollectorData(); /** + * The timeline collector information to be sent to AM. It should be used + * only if the timeline service v.2 is enabled. + * + * @return collector info, including collector address and collector token. + * Return null if the timeline service v.2 is not enabled. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + CollectorInfo getCollectorInfo(); + /** * The original tracking url for the application master. * @return the original tracking url for the application master. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index dbddc5f..13b0792 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable { private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; private AppCollectorData collectorData; + private CollectorInfo collectorInfo; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -530,7 +532,7 @@ public class RMAppImpl implements RMApp, Recoverable { */ public void startTimelineCollector() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(applicationId); + new AppLevelTimelineCollector(applicationId, user); rmContext.getRMTimelineCollectorManager().putIfAbsent( applicationId, collector); } @@ -618,6 +620,12 @@ public class RMAppImpl implements RMApp, Recoverable { public void setCollectorData(AppCollectorData incomingData) { this.collectorData = incomingData; + this.collectorInfo = CollectorInfo.newInstance( + incomingData.getCollectorAddr(), incomingData.getCollectorToken()); + } + + public CollectorInfo getCollectorInfo() { + return this.collectorInfo; } public void removeCollectorData() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 3234d6f..f826631 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -239,6 +240,11 @@ public abstract class MockAsm extends MockApps { public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public CollectorInfo getCollectorInfo() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7594d1de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b0d47a1..39a7f99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -325,4 +326,9 @@ public class MockRMApp implements RMApp { public boolean isAppInCompletedStates() { return false; } + + @Override + public CollectorInfo getCollectorInfo() { + throw new UnsupportedOperationException("Not supported yet."); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org