Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ECABD10F63 for ; Wed, 29 Jan 2014 23:55:21 +0000 (UTC) Received: (qmail 87995 invoked by uid 500); 29 Jan 2014 23:55:21 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 87958 invoked by uid 500); 29 Jan 2014 23:55:20 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 87951 invoked by uid 99); 29 Jan 2014 23:55:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jan 2014 23:55:20 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 29 Jan 2014 23:55:15 +0000 Received: (qmail 87543 invoked by uid 99); 29 Jan 2014 23:54:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jan 2014 23:54:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E8A1E910E74; Wed, 29 Jan 2014 23:54:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-752. Add an API to DAG to accept a list of URIs for which tokens are needed. (sseth) Date: Wed, 29 Jan 2014 23:54:52 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 25c32021e -> 8645ebccb TEZ-752. Add an API to DAG to accept a list of URIs for which tokens are needed. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8645ebcc Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8645ebcc Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8645ebcc Branch: refs/heads/master Commit: 8645ebccb1b2ed54660f8d77c7a39c41ef513415 Parents: 25c3202 Author: Siddharth Seth Authored: Wed Jan 29 15:54:31 2014 -0800 Committer: Siddharth Seth Committed: Wed Jan 29 15:54:31 2014 -0800 ---------------------------------------------------------------------- .../org/apache/tez/client/AMConfiguration.java | 11 +++ .../java/org/apache/tez/client/TezClient.java | 18 +++- .../org/apache/tez/client/TezClientUtils.java | 97 ++++++++++++++++++-- .../java/org/apache/tez/client/TezSession.java | 13 ++- .../main/java/org/apache/tez/dag/api/DAG.java | 54 +++++++++++ 5 files changed, 174 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java index 3dd6424..132a73c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java @@ -38,6 +38,17 @@ public class AMConfiguration { private final TezConfiguration amConf; private final Credentials credentials; + /** + * @param env + * environment for the AM + * @param localResources + * localResources which are required to run the AM + * @param conf + * @param credentials + * credentials which will be needed in the AM. This includes + * credentials which will be required to localize the specified + * localResources. + */ public AMConfiguration(Map env, Map localResources, TezConfiguration conf, Credentials credentials) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 5754900..95dc798 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -81,9 +82,16 @@ public class TezClient { DAG dag, AMConfiguration amConfig) throws TezException, IOException { try { - ApplicationSubmissionContext appContext = - TezClientUtils.createApplicationSubmissionContext(conf, appId, dag, - dag.getName(), amConfig, getTezJarResources()); + // Use the AMCredentials object in client mode, since this won't be re-used. + // Ensures we don't fetch credentially unnecessarily if the user has already provided them. + Credentials credentials = amConfig.getCredentials(); + if (credentials == null) { + credentials = new Credentials(); + } + // Add credentials for tez-local resources. + Map tezJarResources = getTezJarResources(credentials); + ApplicationSubmissionContext appContext = TezClientUtils.createApplicationSubmissionContext( + conf, appId, dag, dag.getName(), amConfig, tezJarResources, credentials); LOG.info("Submitting DAG to YARN" + ", applicationId=" + appId); yarnClient.submitApplication(appContext); @@ -108,10 +116,10 @@ public class TezClient { } } - private synchronized Map getTezJarResources() + private synchronized Map getTezJarResources(Credentials credentials) throws IOException { if (tezJarResources == null) { - tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf); + tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf, credentials); } return tezJarResources; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 214df25..ea88297 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -34,6 +34,8 @@ import java.util.TreeMap; import java.util.Vector; import java.util.Map.Entry; +import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -87,6 +89,10 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; public class TezClientUtils { @@ -101,13 +107,19 @@ public class TezClientUtils { /** * Setup LocalResource map for Tez jars based on provided Configuration - * @param conf Configuration to use to access Tez jars' locations + * + * @param conf + * Configuration to use to access Tez jars' locations + * @param credentials + * a credentials instance into which tokens for the Tez local + * resources will be populated * @return Map of LocalResources to use when launching Tez AM * @throws IOException */ static Map setupTezJarsLocalResources( - TezConfiguration conf) + TezConfiguration conf, Credentials credentials) throws IOException { + Preconditions.checkNotNull(credentials, "A non-null credentials object should be specified"); Map tezJarResources = new TreeMap(); @@ -120,6 +132,8 @@ public class TezClientUtils { + ", " + TezConfiguration.TEZ_LIB_URIS + " is not defined in the configurartion"); } + + List tezJarPaths = Lists.newArrayListWithCapacity(tezJarUris.length); for (String tezJarUri : tezJarUris) { URI uri; @@ -139,6 +153,8 @@ public class TezClientUtils { } Path p = new Path(uri); FileSystem pathfs = p.getFileSystem(conf); + p = pathfs.makeQualified(p); + tezJarPaths.add(p); RemoteIterator iter = pathfs.listFiles(p, false); while (iter.hasNext()) { LocatedFileStatus fStatus = iter.next(); @@ -163,9 +179,14 @@ public class TezClientUtils { fStatus.getModificationTime())); } } + if (tezJarResources.isEmpty()) { LOG.warn("No tez jars found in configured locations" + ". Ignoring for now. Errors may occur"); + } else { + // Obtain credentials. + TokenCache.obtainTokensForNamenodes(credentials, + tezJarPaths.toArray(new Path[tezJarPaths.size()]), conf); } return tezJarResources; } @@ -208,6 +229,48 @@ public class TezClientUtils { } /** + * Obtains tokens for the DAG based on the list of URIs setup in the DAG. The + * fetched credentials are populated back into the DAG and can be retrieved + * via dag.getCredentials + * + * @param dag + * the dag for which credentials need to be setup + * @param sessionCredentials + * session credentials which have already been obtained, and will be + * required for the DAG + * @param conf + * @throws IOException + */ + @Private + static void setupDAGCredentials(DAG dag, Credentials sessionCredentials, + Configuration conf) throws IOException { + + Preconditions.checkNotNull(sessionCredentials); + Credentials dagCredentials = dag.getCredentials(); + if (dagCredentials == null) { + dagCredentials = new Credentials(); + dag.setCredentials(dagCredentials); + } + // All session creds are required for the DAG. + dagCredentials.mergeAll(sessionCredentials); + + // Add additional credentials based on any URIs that the user may have specified. + + // Obtain Credentials for any paths that the user may have configured. + List uris = dag.getURIsForCredentials(); + if (uris != null && !uris.isEmpty()) { + Iterator pathIter = Iterators.transform(uris.iterator(), new Function() { + @Override + public Path apply(@Nullable URI input) { + return new Path(input); + } + }); + Path[] paths = Iterators.toArray(pathIter, Path.class); + TokenCache.obtainTokensForNamenodes(dagCredentials, paths, conf); + } + } + + /** * Create an ApplicationSubmissionContext to launch a Tez AM * @param conf TezConfiguration * @param appId Application Id @@ -215,16 +278,18 @@ public class TezClientUtils { * @param amName Name for the application * @param amConfig AM Configuration * @param tezJarResources Resources to be used by the AM + * @param sessionCredentials the credential object which will be populated with session specific * @return an ApplicationSubmissionContext to launch a Tez AM * @throws IOException * @throws YarnException */ static ApplicationSubmissionContext createApplicationSubmissionContext( TezConfiguration conf, ApplicationId appId, DAG dag, String amName, - AMConfiguration amConfig, - Map tezJarResources) + AMConfiguration amConfig, Map tezJarResources, + Credentials sessionCreds) throws IOException, YarnException{ + Preconditions.checkNotNull(sessionCreds); FileSystem fs = TezClientUtils.ensureStagingDirExists(conf, amConfig.getStagingDir()); Path binaryConfPath = new Path(amConfig.getStagingDir(), @@ -243,20 +308,31 @@ public class TezClientUtils { LOG.debug("AppMaster capability = " + capability); } + // Setup required Credentials for the AM launch. DAG specific credentials + // are handled separately. ByteBuffer securityTokens = null; // Setup security tokens - Credentials credentials = amConfig.getCredentials(); - if (credentials == null) { - credentials = new Credentials(); + Credentials amLaunchCredentials = new Credentials(); + if (amConfig.getCredentials() != null) { + amLaunchCredentials.addAll(amConfig.getCredentials()); } - // Obtain Credentials for the staging dir. - TokenCache.obtainTokensForNamenodes(credentials, new Path[] { binaryConfPath }, conf); + // Add Staging dir creds to the list of session credentials. + TokenCache.obtainTokensForNamenodes(sessionCreds, new Path[] {binaryConfPath}, conf); + + // Add session specific credentials to the AM credentials. + amLaunchCredentials.mergeAll(sessionCreds); DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); + amLaunchCredentials.writeTokenStorageToStream(dob); securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + // Need to set credentials based on DAG and the URIs which have been set for the DAG. + + if (dag != null) { + setupDAGCredentials(dag, sessionCreds, conf); + } + // Setup the command to run the AM List vargs = new ArrayList(8); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); @@ -308,6 +384,7 @@ public class TezClientUtils { Map localResources = new TreeMap(); + // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials. if (amConfig.getLocalResources() != null) { localResources.putAll(amConfig.getLocalResources()); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezSession.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java index e0cbc93..e452616 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -41,9 +42,9 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto; -import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import com.google.common.annotations.VisibleForTesting; @@ -60,6 +61,8 @@ public class TezSession { private YarnClient yarnClient; private boolean sessionStarted = false; private boolean sessionStopped = false; + /** Tokens which will be required for all DAGs submitted to this session. */ + private Credentials sessionCredentials = new Credentials(); public TezSession(String sessionName, ApplicationId applicationId, @@ -86,7 +89,7 @@ public class TezSession { Map tezJarResources = TezClientUtils.setupTezJarsLocalResources( - sessionConfig.getTezConfiguration()); + sessionConfig.getTezConfiguration(), sessionCredentials); if (sessionConfig.getSessionResources() != null && !sessionConfig.getSessionResources().isEmpty()) { @@ -103,7 +106,7 @@ public class TezSession { TezClientUtils.createApplicationSubmissionContext( sessionConfig.getTezConfiguration(), applicationId, null, sessionName, sessionConfig.getAMConfiguration(), - tezJarResources); + tezJarResources, sessionCredentials); // Set Tez Sessions to not retry on AM crashes appContext.setMaxAppAttempts(1); yarnClient.submitApplication(appContext); @@ -137,7 +140,9 @@ public class TezSession { + ", sessionName=" + sessionName + ", applicationId=" + applicationId); - // setup env + // Obtain DAG specific credentials. + TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration()); + Map environment = TezClientUtils .createEnvironment(sessionConfig.getYarnConfiguration()); for (Vertex v : dag.getVertices()) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 3840110..ad6cd8c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -17,11 +17,13 @@ */ package org.apache.tez.dag.api; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -48,6 +50,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; +import com.google.common.base.Preconditions; import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; import org.apache.commons.collections4.BidiMap; @@ -55,7 +58,9 @@ public class DAG { // FIXME rename to Topology final BidiMap vertices; final List edges; final String name; + final List urisForCredentials = new LinkedList(); Credentials credentials; + public DAG(String name) { this.vertices = new DualLinkedHashBidiMap(); @@ -76,12 +81,61 @@ public class DAG { // FIXME rename to Topology return vertices.get(vertexName); } + /** + * One of the methods that can be used to provide information about required + * Credentials when running on a secure cluster. A combination of this and + * addURIsForCredentials should be used to specify information about all + * credentials required by a DAG. AM specific credentials are not used when + * executing a DAG. + * + * Set credentials which will be required to run this dag. This method can be + * used if the client has already obtained some or all of the required + * credentials. + * + * @param credentials + * @return + */ public synchronized DAG setCredentials(Credentials credentials) { this.credentials = credentials; return this; } @Private + public synchronized Credentials getCredentials() { + return this.credentials; + } + + /** + * One of the methods that can be used to provide information about required + * Credentials when running on a secure cluster. A combination of this and + * setCredentials should be used to specify information about all + * credentials required by a DAG. AM specific credentials are not used when + * executing a DAG. + * + * This method can be used to specify a list of URIs for which Credentials need to be + * obtained so that the job can run. + * An incremental list of URIs can be provided by making multiple calls to the method. + * + * @param uris a list of {@link URI}s + * @return the DAG instance being used + */ + public synchronized DAG addURIsForCredentials(List uris) { + Preconditions.checkNotNull(uris, "URIs cannot be null"); + urisForCredentials.addAll(uris); + return this; + } + + /** + * + * @return an unmodifiable list representing the URIs for which credentials + * are required. + */ + @Private + public synchronized List getURIsForCredentials() { + return Collections.unmodifiableList(urisForCredentials); + } + + @Private public synchronized Set getVertices() { return Collections.unmodifiableSet(this.vertices.values()); }