Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4ABE710B4A for ; Sat, 21 Feb 2015 01:21:15 +0000 (UTC) Received: (qmail 3427 invoked by uid 500); 21 Feb 2015 01:21:09 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 3333 invoked by uid 500); 21 Feb 2015 01:21:09 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 3317 invoked by uid 99); 21 Feb 2015 01:21:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Feb 2015 01:21:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6319E03E4; Sat, 21 Feb 2015 01:21:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 21 Feb 2015 01:21:13 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/16] tez git commit: TEZ-2130. Send the sessionToken as part of the AM CLC. (sseth) TEZ-2130. Send the sessionToken as part of the AM CLC. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9a74d77 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9a74d77 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9a74d77 Branch: refs/heads/TEZ-2003 Commit: c9a74d77b2d97a30f76fc498c243eca0a603077b Parents: 49f76ad Author: Siddharth Seth Authored: Fri Feb 20 17:19:03 2015 -0800 Committer: Siddharth Seth Committed: Fri Feb 20 17:19:03 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/client/TezClientUtils.java | 10 +++- .../org/apache/tez/common/TezCommonUtils.java | 20 +++++++ .../apache/tez/client/TestTezClientUtils.java | 59 ++++++++++++++++++-- .../app/rm/container/AMContainerHelpers.java | 26 +-------- 5 files changed, 87 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 77b0985..c2d5e75 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2130. Send the sessionToken as part of the AM CLC. TEZ-1935. Organization should be removed from http://tez.apache.org/team-list.html. TEZ-2009. Change license/copyright headers to 2015. TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously. http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 597789c..69bc08e 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -621,10 +622,17 @@ public class TezClientUtils { } } + // Send the shuffle token as part of the AM launch context, so that the NM running the AM can + // provide this to AuxServices running on the AM node - in case tasks run within the AM, + // and no other task runs on this node. + Map serviceData = new HashMap(); + serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials))); + // Setup ContainerLaunchContext for AM container ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(amLocalResources, environment, - vargsFinal, null, securityTokens, acls); + vargsFinal, serviceData, securityTokens, acls); // Set up the ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index fe570a5..05e868c 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -439,4 +439,24 @@ public class TezCommonUtils { return str.split("\\s*,\\s*"); } + + /** + * A helper function to serialize the JobTokenIdentifier to be sent to the + * ShuffleHandler as ServiceData. + * + * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly + * to avoid a dependency on mapreduce. + * + * @param jobToken + * the job token to be used for authentication of shuffle data + * requests. + * @return the serialized version of the jobToken. + */ + public static ByteBuffer serializeServiceData(Token jobToken) + throws IOException { + // TODO these bytes should be versioned + DataOutputBuffer jobToken_dob = new DataOutputBuffer(); + jobToken.write(jobToken_dob); + return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java index 468361b..ea73ab3 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java @@ -29,7 +29,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -43,16 +45,23 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -196,20 +205,59 @@ public class TestTezClientUtils { } @Test(timeout = 5000) + public void testSessionTokenInAmClc() throws IOException, YarnException { + + TezConfiguration tezConf = new TezConfiguration(); + + ApplicationId appId = ApplicationId.newInstance(1000, 1); + DAG dag = DAG.create("testdag"); + dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) + .setTaskLaunchCmdOpts("initialLaunchOpts")); + + Credentials credentials = new Credentials(); + JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); + TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials); + Token jobToken = TokenCache.getSessionToken(credentials); + assertNotNull(jobToken); + + AMConfiguration amConf = + new AMConfiguration(tezConf, new HashMap(), credentials); + ApplicationSubmissionContext appSubmissionContext = + TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, + new HashMap(), credentials, false, new TezApiVersionInfo(), + mock(HistoryACLPolicyManager.class)); + + ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec(); + Map amServiceData = amClc.getServiceData(); + assertNotNull(amServiceData); + assertEquals(1, amServiceData.size()); + + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amServiceData.values().iterator().next()); + Token jtSent = new Token(); + jtSent.readFields(dibb); + + assertTrue(Arrays.equals(jobToken.getIdentifier(), jtSent.getIdentifier())); + } + + @Test(timeout = 5000) public void testAMLoggingOptsSimple() throws IOException, YarnException { TezConfiguration tezConf = new TezConfiguration(); tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN"); ApplicationId appId = ApplicationId.newInstance(1000, 1); + Credentials credentials = new Credentials(); + JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); + TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials); DAG dag = DAG.create("testdag"); dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) .setTaskLaunchCmdOpts("initialLaunchOpts")); AMConfiguration amConf = - new AMConfiguration(tezConf, new HashMap(), new Credentials()); + new AMConfiguration(tezConf, new HashMap(), credentials); ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, - new HashMap(), new Credentials(), false, new TezApiVersionInfo(), + new HashMap(), credentials, false, new TezApiVersionInfo(), mock(HistoryACLPolicyManager.class)); List expectedCommands = new LinkedList(); @@ -238,14 +286,17 @@ public class TestTezClientUtils { tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN;org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG"); ApplicationId appId = ApplicationId.newInstance(1000, 1); + Credentials credentials = new Credentials(); + JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); + TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials); DAG dag = DAG.create("testdag"); dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) .setTaskLaunchCmdOpts("initialLaunchOpts")); AMConfiguration amConf = - new AMConfiguration(tezConf, new HashMap(), new Credentials()); + new AMConfiguration(tezConf, new HashMap(), credentials); ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, - new HashMap(), new Credentials(), false, new TezApiVersionInfo(), + new HashMap(), credentials, false, new TezApiVersionInfo(), mock(HistoryACLPolicyManager.class)); List expectedCommands = new LinkedList(); http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index b776349..d1b2ea8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -46,7 +45,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.client.TezClientUtils; -import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -121,7 +120,7 @@ public class AMContainerHelpers { // Add shuffle token LOG.info("Putting shuffle token in serviceData"); serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, - serializeServiceData(TokenCache.getSessionToken(containerCredentials))); + TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials))); } catch (IOException e) { throw new TezUncheckedException(e); } @@ -211,25 +210,4 @@ public class AMContainerHelpers { return container; } - - /** - * A helper function to serialize the JobTokenIdentifier to be sent to the - * ShuffleHandler as ServiceData. - * - * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly - * to avoid a dependency on mapreduce. - * - * @param jobToken - * the job token to be used for authentication of shuffle data - * requests. - * @return the serialized version of the jobToken. - */ - private static ByteBuffer serializeServiceData(Token jobToken) - throws IOException { - // TODO these bytes should be versioned - DataOutputBuffer jobToken_dob = new DataOutputBuffer(); - jobToken.write(jobToken_dob); - return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()); - } - }