Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C4D7200CF3 for ; Wed, 30 Aug 2017 08:13:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6AC421686A2; Wed, 30 Aug 2017 06:13:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E75ED16869B for ; Wed, 30 Aug 2017 08:13:35 +0200 (CEST) Received: (qmail 43774 invoked by uid 500); 30 Aug 2017 06:13:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 40113 invoked by uid 99); 30 Aug 2017 06:13:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 06:13:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E84FF5658; Wed, 30 Aug 2017 06:13:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Wed, 30 Aug 2017 06:14:01 -0000 Message-Id: <9f30a76be97047b9a6731ea8bb04439c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] hadoop git commit: MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena archived-at: Wed, 30 Aug 2017 06:13:37 -0000 MAPREDUCE-6838. [ATSv2 Security] Add timeline delegation token received in allocate response to UGI. Contributed by Varun Saxena Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08f40bcc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08f40bcc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08f40bcc Branch: refs/heads/trunk Commit: 08f40bcc7f4174857bb1fc7c8eb1108d5caaafb3 Parents: bea3e4d Author: Jian He Authored: Mon Aug 21 22:08:07 2017 -0700 Committer: Varun Saxena Committed: Wed Aug 30 11:29:54 2017 +0530 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerAllocator.java | 17 +-- .../v2/app/rm/TestRMContainerAllocator.java | 137 +++++++++++++++++++ .../hadoop/yarn/api/records/CollectorInfo.java | 4 + .../api/async/impl/AMRMClientAsyncImpl.java | 13 +- .../yarn/client/api/TimelineV2Client.java | 11 +- .../client/api/impl/TimelineV2ClientImpl.java | 80 ++++++++++- .../api/impl/TestTimelineClientV2Impl.java | 56 +++++++- .../timelineservice/NMTimelinePublisher.java | 3 +- .../TestTimelineServiceClientIntegration.java | 13 +- .../security/TestTimelineAuthFilterForV2.java | 3 +- 10 files changed, 301 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 969ec4c..0dc7642 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -848,7 +848,8 @@ public class RMContainerAllocator extends RMContainerRequestor updateAMRMToken(response.getAMRMToken()); } - List finishedContainers = response.getCompletedContainersStatuses(); + List finishedContainers = + response.getCompletedContainersStatuses(); // propagate preemption requests final PreemptionMessage preemptReq = response.getPreemptionMessage(); @@ -877,19 +878,13 @@ public class RMContainerAllocator extends RMContainerRequestor handleUpdatedNodes(response); handleJobPriorityChange(response); - // handle receiving the timeline collector address for this app - String collectorAddr = null; - if (response.getCollectorInfo() != null) { - collectorAddr = response.getCollectorInfo().getCollectorAddr(); - } - + // Handle receiving the timeline collector address and token for this app. MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); - if (collectorAddr != null && !collectorAddr.isEmpty() - && appContext.getTimelineV2Client() != null) { - appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr); + if (appContext.getTimelineV2Client() != null) { + appContext.getTimelineV2Client(). + setTimelineCollectorInfo(response.getCollectorInfo()); } - for (ContainerStatus cont : finishedContainers) { processFinishedContainer(cont); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 6c51626..6c74a7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; @@ -27,6 +28,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -121,6 +125,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -137,9 +142,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService.TimelineV2DelegationTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; import org.junit.Assert; @@ -749,6 +756,96 @@ public class TestRMContainerAllocator { } @Test + public void testUpdateCollectorInfo() throws Exception { + LOG.info("Running testUpdateCollectorInfo"); + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + String localAddr = "localhost:1234"; + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + // Generate a timeline delegation token. + TimelineDelegationTokenIdentifier ident = + new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()), + new Text("renewer"), null); + ident.setSequenceNumber(1); + Token collectorToken = + new Token (ident.getBytes(), + new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME, + new Text(localAddr)); + org.apache.hadoop.yarn.api.records.Token token = + org.apache.hadoop.yarn.api.records.Token.newInstance( + collectorToken.getIdentifier(), collectorToken.getKind().toString(), + collectorToken.getPassword(), + collectorToken.getService().toString()); + CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token); + // Mock scheduler to server Allocate request. + final MockSchedulerForTimelineCollector mockScheduler = + new MockSchedulerForTimelineCollector(collectorInfo); + MyContainerAllocator allocator = + new MyContainerAllocator(null, conf, attemptId, mockJob, + SystemClock.getInstance()) { + @Override + protected void register() { + } + + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + }; + // Initially UGI should have no tokens. + ArrayList> tokens = + new ArrayList<>(ugi.getTokens()); + assertEquals(0, tokens.size()); + TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId)); + client.init(conf); + when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()). + thenReturn(client); + + // Send allocate request to RM and fetch collector address and token. + allocator.schedule(); + verify(client).setTimelineCollectorInfo(collectorInfo); + // Verify if token has been updated in UGI. + tokens = new ArrayList<>(ugi.getTokens()); + assertEquals(1, tokens.size()); + assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME, + tokens.get(0).getKind()); + assertEquals(collectorToken.decodeIdentifier(), + tokens.get(0).decodeIdentifier()); + + // Generate new collector token, send allocate request to RM and fetch the + // new token. + ident.setSequenceNumber(100); + Token collectorToken1 = + new Token (ident.getBytes(), + new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME, + new Text(localAddr)); + token = org.apache.hadoop.yarn.api.records.Token.newInstance( + collectorToken1.getIdentifier(), collectorToken1.getKind().toString(), + collectorToken1.getPassword(), collectorToken1.getService().toString()); + collectorInfo = CollectorInfo.newInstance(localAddr, token); + mockScheduler.updateCollectorInfo(collectorInfo); + allocator.schedule(); + verify(client).setTimelineCollectorInfo(collectorInfo); + // Verify if new token has been updated in UGI. + tokens = new ArrayList<>(ugi.getTokens()); + assertEquals(1, tokens.size()); + assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME, + tokens.get(0).getKind()); + assertEquals(collectorToken1.decodeIdentifier(), + tokens.get(0).decodeIdentifier()); + allocator.close(); + } + + @Test public void testMapReduceScheduling() throws Exception { LOG.info("Running testMapReduceScheduling"); @@ -3488,6 +3585,46 @@ public class TestRMContainerAllocator { } } + private static class MockSchedulerForTimelineCollector + implements ApplicationMasterProtocol { + CollectorInfo collectorInfo; + + public MockSchedulerForTimelineCollector(CollectorInfo info) { + this.collectorInfo = info; + } + + void updateCollectorInfo(CollectorInfo info) { + collectorInfo = info; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return Records.newRecord(RegisterApplicationMasterResponse.class); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return FinishApplicationMasterResponse.newInstance(false); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + AllocateResponse response = AllocateResponse.newInstance( + request.getResponseId(), Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Resource.newInstance(512000, 1024), null, 10, null, + Collections.emptyList()); + response.setCollectorInfo(collectorInfo); + return response; + } + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java index 960c992..d22b9fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java @@ -32,6 +32,10 @@ public abstract class CollectorInfo { protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + public static CollectorInfo newInstance(String collectorAddr) { + return newInstance(collectorAddr, null); + } + public static CollectorInfo newInstance(String collectorAddr, Token token) { CollectorInfo amCollectorInfo = Records.newRecord(CollectorInfo.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 265badb..d12b108 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -68,8 +68,6 @@ extends AMRMClientAsync { private volatile boolean keepRunning; private volatile float progress; - - private volatile String collectorAddr; /** * @@ -332,14 +330,9 @@ extends AMRMClientAsync { TimelineV2Client timelineClient = client.getRegisteredTimelineV2Client(); - if (timelineClient != null && collectorAddress != null - && !collectorAddress.isEmpty()) { - if (collectorAddr == null - || !collectorAddr.equals(collectorAddress)) { - collectorAddr = collectorAddress; - timelineClient.setTimelineServiceAddress(collectorAddress); - LOG.info("collectorAddress " + collectorAddress); - } + if (timelineClient != null && response.getCollectorInfo() != null) { + timelineClient. + setTimelineCollectorInfo(response.getCollectorInfo()); } List updatedNodes = response.getUpdatedNodes(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java index 32cf1e9..da81a91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -83,10 +85,13 @@ public abstract class TimelineV2Client extends CompositeService { /** *

- * Update the timeline service address where the request will be sent to. + * Update collector info received in AllocateResponse which contains the + * timeline service address where the request will be sent to and the timeline + * delegation token which will be used to send the request. *

* - * @param address the timeline service address + * @param collectorInfo Collector info which contains the timeline service + * address and timeline delegation token. */ - public abstract void setTimelineServiceAddress(String address); + public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index 128ae7a..97d1364 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.concurrent.BlockingQueue; @@ -39,15 +40,22 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.core.util.MultivaluedMapImpl; @@ -62,6 +70,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client { private TimelineEntityDispatcher entityDispatcher; private volatile String timelineServiceAddress; + @VisibleForTesting + volatile Token currentTimelineToken = null; // Retry parameters for identifying new timeline service // TODO consider to merge with connection retry @@ -100,7 +110,6 @@ public class TimelineV2ClientImpl extends TimelineV2Client { authUgi = ugi; doAsUser = null; } - // TODO need to add/cleanup filter retry later for ATSV2. similar to V1 DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token(); @@ -144,8 +153,73 @@ public class TimelineV2ClientImpl extends TimelineV2Client { } @Override - public void setTimelineServiceAddress(String address) { - this.timelineServiceAddress = address; + public void setTimelineCollectorInfo(CollectorInfo collectorInfo) { + if (collectorInfo == null) { + LOG.warn("Not setting collector info as it is null."); + return; + } + // First update the token so that it is available when collector address is + // used. + if (collectorInfo.getCollectorToken() != null) { + // Use collector address to update token service if its not available. + setTimelineDelegationToken( + collectorInfo.getCollectorToken(), collectorInfo.getCollectorAddr()); + } + // Update timeline service address. + if (collectorInfo.getCollectorAddr() != null && + !collectorInfo.getCollectorAddr().isEmpty() && + !collectorInfo.getCollectorAddr().equals(timelineServiceAddress)) { + this.timelineServiceAddress = collectorInfo.getCollectorAddr(); + LOG.info("Updated timeline service address to " + timelineServiceAddress); + } + } + + private void setTimelineDelegationToken(Token delegationToken, + String collectorAddr) { + // Checks below are to ensure that an invalid token is not updated in UGI. + // This is required because timeline token is set via a public API. + if (!delegationToken.getKind().equals( + TimelineDelegationTokenIdentifier.KIND_NAME.toString())) { + LOG.warn("Timeline token to be updated should be of kind " + + TimelineDelegationTokenIdentifier.KIND_NAME); + return; + } + if (collectorAddr == null || collectorAddr.isEmpty()) { + collectorAddr = timelineServiceAddress; + } + // Token need not be updated if either address or token service does not + // exist. + String service = delegationToken.getService(); + if ((service == null || service.isEmpty()) && + (collectorAddr == null || collectorAddr.isEmpty())) { + LOG.warn("Timeline token does not have service and timeline service " + + "address is not yet set. Not updating the token"); + return; + } + // No need to update a duplicate token. + if (currentTimelineToken != null && + currentTimelineToken.equals(delegationToken)) { + return; + } + currentTimelineToken = delegationToken; + // Convert the token, sanitize the token service and add it to UGI. + org.apache.hadoop.security.token. + Token timelineToken = + new org.apache.hadoop.security.token. + Token( + delegationToken.getIdentifier().array(), + delegationToken.getPassword().array(), + new Text(delegationToken.getKind()), + service == null ? new Text() : new Text(service)); + // Prefer timeline service address over service coming in the token for + // updating the token service. + InetSocketAddress serviceAddr = + (collectorAddr != null && !collectorAddr.isEmpty()) ? + NetUtils.createSocketAddr(collectorAddr) : + SecurityUtil.getTokenServiceAddr(timelineToken); + SecurityUtil.setTokenService(timelineToken, serviceAddr); + authUgi.addToken(timelineToken); + LOG.info("Updated timeline delegation token " + timelineToken); } @Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java index c5b02fd..95595a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.client.api.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -27,11 +32,16 @@ import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -151,7 +161,7 @@ public class TestTimelineClientV2Impl { maxRetries); c.init(conf); c.start(); - c.setTimelineServiceAddress("localhost:12345"); + c.setTimelineCollectorInfo(CollectorInfo.newInstance("localhost:12345")); try { c.putEntities(new TimelineEntity()); } catch (IOException e) { @@ -311,6 +321,50 @@ public class TestTimelineClientV2Impl { } @Test + public void testSetTimelineToken() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + assertEquals(0, ugi.getTokens().size()); + assertNull("Timeline token in v2 client should not be set", + client.currentTimelineToken); + + Token token = Token.newInstance( + new byte[0], "kind", new byte[0], "service"); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token)); + assertNull("Timeline token in v2 client should not be set as token kind " + + "is unexepcted.", client.currentTimelineToken); + assertEquals(0, ugi.getTokens().size()); + + token = Token.newInstance(new byte[0], TimelineDelegationTokenIdentifier. + KIND_NAME.toString(), new byte[0], null); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token)); + assertNull("Timeline token in v2 client should not be set as serice is " + + "not set.", client.currentTimelineToken); + assertEquals(0, ugi.getTokens().size()); + + TimelineDelegationTokenIdentifier ident = + new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()), + new Text("renewer"), null); + ident.setSequenceNumber(1); + token = Token.newInstance(ident.getBytes(), + TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0], + "localhost:1234"); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, token)); + assertEquals(1, ugi.getTokens().size()); + assertNotNull("Timeline token should be set in v2 client.", + client.currentTimelineToken); + assertEquals(token, client.currentTimelineToken); + + ident.setSequenceNumber(20); + Token newToken = Token.newInstance(ident.getBytes(), + TimelineDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0], + "localhost:1234"); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(null, newToken)); + assertEquals(1, ugi.getTokens().size()); + assertNotEquals(token, client.currentTimelineToken); + assertEquals(newToken, client.currentTimelineToken); + } + + @Test public void testAfterStop() throws Exception { client.setSleepBeforeReturn(true); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 34eddf7..b8192ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -437,7 +438,7 @@ public class NMTimelinePublisher extends CompositeService { String collectorAddr) { TimelineV2Client client = appToClientMap.get(appId); if (client != null) { - client.setTimelineServiceAddress(collectorAddr); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(collectorAddr)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index eb4381d..6a5ef55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; @@ -99,9 +100,9 @@ public class TestTimelineServiceClientIntegration { TimelineV2Client client = TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1)); try { - // set the timeline service address manually - client.setTimelineServiceAddress( - collectorManager.getRestServerBindAddress()); + // Set the timeline service address manually. + client.setTimelineCollectorInfo(CollectorInfo.newInstance( + collectorManager.getRestServerBindAddress())); client.init(conf); client.start(); TimelineEntity entity = new TimelineEntity(); @@ -126,9 +127,9 @@ public class TestTimelineServiceClientIntegration { TimelineV2Client client = TimelineV2Client.createTimelineClient(appId); try { - // set the timeline service address manually - client.setTimelineServiceAddress( - collectorManager.getRestServerBindAddress()); + // Set the timeline service address manually. + client.setTimelineCollectorInfo(CollectorInfo.newInstance( + collectorManager.getRestServerBindAddress())); client.init(conf); client.start(); ClusterEntity cluster = new ClusterEntity(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08f40bcc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index bc1594c..75f17fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -56,6 +56,7 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -228,7 +229,7 @@ public class TestTimelineAuthFilterForV2 { String restBindAddr = collectorManager.getRestServerBindAddress(); String addr = "localhost" + restBindAddr.substring(restBindAddr.indexOf(":")); - client.setTimelineServiceAddress(addr); + client.setTimelineCollectorInfo(CollectorInfo.newInstance(addr)); client.init(conf); client.start(); return client; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org