tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-752. Add an API to DAG to accept a list of URIs for which tokens are needed. (sseth)
Date Wed, 29 Jan 2014 23:54:52 GMT
Updated Branches:
  refs/heads/master 25c32021e -> 8645ebccb


TEZ-752. Add an API to DAG to accept a list of URIs for which tokens are
needed. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8645ebcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8645ebcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8645ebcc

Branch: refs/heads/master
Commit: 8645ebccb1b2ed54660f8d77c7a39c41ef513415
Parents: 25c3202
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jan 29 15:54:31 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jan 29 15:54:31 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/AMConfiguration.java  | 11 +++
 .../java/org/apache/tez/client/TezClient.java   | 18 +++-
 .../org/apache/tez/client/TezClientUtils.java   | 97 ++++++++++++++++++--
 .../java/org/apache/tez/client/TezSession.java  | 13 ++-
 .../main/java/org/apache/tez/dag/api/DAG.java   | 54 +++++++++++
 5 files changed, 174 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 3dd6424..132a73c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -38,6 +38,17 @@ public class AMConfiguration {
   private final TezConfiguration amConf;
   private final Credentials credentials;
 
+  /**
+   * @param env
+   *          environment for the AM
+   * @param localResources
+   *          localResources which are required to run the AM
+   * @param conf
+   * @param credentials
+   *          credentials which will be needed in the AM. This includes
+   *          credentials which will be required to localize the specified
+   *          localResources.
+   */
   public AMConfiguration(Map<String, String> env,
       Map<String, LocalResource> localResources,
       TezConfiguration conf, Credentials credentials) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 5754900..95dc798 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -81,9 +82,16 @@ public class TezClient {
       DAG dag, AMConfiguration amConfig)
           throws TezException, IOException {
     try {
-      ApplicationSubmissionContext appContext =
-          TezClientUtils.createApplicationSubmissionContext(conf, appId, dag,
-              dag.getName(), amConfig, getTezJarResources());
+      // Use the AMCredentials object in client mode, since this won't be re-used.
+      // Ensures we don't fetch credentially unnecessarily if the user has already provided
them.
+      Credentials credentials = amConfig.getCredentials();
+      if (credentials == null) {
+        credentials = new Credentials();
+      }
+      // Add credentials for tez-local resources.
+      Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
+      ApplicationSubmissionContext appContext = TezClientUtils.createApplicationSubmissionContext(
+          conf, appId, dag, dag.getName(), amConfig, tezJarResources, credentials);
       LOG.info("Submitting DAG to YARN"
           + ", applicationId=" + appId);
       yarnClient.submitApplication(appContext);
@@ -108,10 +116,10 @@ public class TezClient {
     }
   }
 
-  private synchronized Map<String, LocalResource> getTezJarResources()
+  private synchronized Map<String, LocalResource> getTezJarResources(Credentials credentials)
       throws IOException {
     if (tezJarResources == null) {
-      tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf);
+      tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf, credentials);
     }
     return tezJarResources;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 214df25..ea88297 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -34,6 +34,8 @@ import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
 
+import javax.annotation.Nullable;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -87,6 +89,10 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 public class TezClientUtils {
 
@@ -101,13 +107,19 @@ public class TezClientUtils {
 
   /**
    * Setup LocalResource map for Tez jars based on provided Configuration
-   * @param conf Configuration to use to access Tez jars' locations
+   * 
+   * @param conf
+   *          Configuration to use to access Tez jars' locations
+   * @param credentials
+   *          a credentials instance into which tokens for the Tez local
+   *          resources will be populated
    * @return Map of LocalResources to use when launching Tez AM
    * @throws IOException
    */
   static Map<String, LocalResource> setupTezJarsLocalResources(
-      TezConfiguration conf)
+      TezConfiguration conf, Credentials credentials)
       throws IOException {
+    Preconditions.checkNotNull(credentials, "A non-null credentials object should be specified");
     Map<String, LocalResource> tezJarResources =
         new TreeMap<String, LocalResource>();
 
@@ -120,6 +132,8 @@ public class TezClientUtils {
           + ", " + TezConfiguration.TEZ_LIB_URIS
           + " is not defined in the configurartion");
     }
+    
+    List<Path> tezJarPaths = Lists.newArrayListWithCapacity(tezJarUris.length);
 
     for (String tezJarUri : tezJarUris) {
       URI uri;
@@ -139,6 +153,8 @@ public class TezClientUtils {
       }
       Path p = new Path(uri);
       FileSystem pathfs = p.getFileSystem(conf);
+      p = pathfs.makeQualified(p);
+      tezJarPaths.add(p);
       RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
       while (iter.hasNext()) {
         LocatedFileStatus fStatus = iter.next();
@@ -163,9 +179,14 @@ public class TezClientUtils {
                 fStatus.getModificationTime()));
       }
     }
+
     if (tezJarResources.isEmpty()) {
       LOG.warn("No tez jars found in configured locations"
           + ". Ignoring for now. Errors may occur");
+    } else {
+      // Obtain credentials.
+      TokenCache.obtainTokensForNamenodes(credentials,
+          tezJarPaths.toArray(new Path[tezJarPaths.size()]), conf);
     }
     return tezJarResources;
   }
@@ -208,6 +229,48 @@ public class TezClientUtils {
   }
 
   /**
+   * Obtains tokens for the DAG based on the list of URIs setup in the DAG. The
+   * fetched credentials are populated back into the DAG and can be retrieved
+   * via dag.getCredentials
+   * 
+   * @param dag
+   *          the dag for which credentials need to be setup
+   * @param sessionCredentials
+   *          session credentials which have already been obtained, and will be
+   *          required for the DAG
+   * @param conf
+   * @throws IOException
+   */
+  @Private
+  static void setupDAGCredentials(DAG dag, Credentials sessionCredentials,
+      Configuration conf) throws IOException {
+
+    Preconditions.checkNotNull(sessionCredentials);
+    Credentials dagCredentials = dag.getCredentials();
+    if (dagCredentials == null) {
+      dagCredentials = new Credentials();
+      dag.setCredentials(dagCredentials);
+    }
+    // All session creds are required for the DAG.
+    dagCredentials.mergeAll(sessionCredentials);
+    
+    // Add additional credentials based on any URIs that the user may have specified.
+    
+    // Obtain Credentials for any paths that the user may have configured.
+    List<URI> uris = dag.getURIsForCredentials();
+    if (uris != null && !uris.isEmpty()) {
+      Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI,
Path>() {
+        @Override
+        public Path apply(@Nullable URI input) {
+          return new Path(input);
+        }
+      });
+      Path[] paths = Iterators.toArray(pathIter, Path.class);
+      TokenCache.obtainTokensForNamenodes(dagCredentials, paths, conf);
+    }
+  }
+
+  /**
    * Create an ApplicationSubmissionContext to launch a Tez AM
    * @param conf TezConfiguration
    * @param appId Application Id
@@ -215,16 +278,18 @@ public class TezClientUtils {
    * @param amName Name for the application
    * @param amConfig AM Configuration
    * @param tezJarResources Resources to be used by the AM
+   * @param sessionCredentials the credential object which will be populated with session
specific
    * @return an ApplicationSubmissionContext to launch a Tez AM
    * @throws IOException
    * @throws YarnException
    */
   static ApplicationSubmissionContext createApplicationSubmissionContext(
       TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
-      AMConfiguration amConfig,
-      Map<String, LocalResource> tezJarResources)
+      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
+      Credentials sessionCreds)
           throws IOException, YarnException{
 
+    Preconditions.checkNotNull(sessionCreds);
     FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
         amConfig.getStagingDir());
     Path binaryConfPath =  new Path(amConfig.getStagingDir(),
@@ -243,20 +308,31 @@ public class TezClientUtils {
       LOG.debug("AppMaster capability = " + capability);
     }
 
+    // Setup required Credentials for the AM launch. DAG specific credentials
+    // are handled separately.
     ByteBuffer securityTokens = null;
     // Setup security tokens
-    Credentials credentials = amConfig.getCredentials();
-    if (credentials == null) {
-      credentials = new Credentials();
+    Credentials amLaunchCredentials = new Credentials();
+    if (amConfig.getCredentials() != null) {
+      amLaunchCredentials.addAll(amConfig.getCredentials());
     }
 
-    // Obtain Credentials for the staging dir.
-    TokenCache.obtainTokensForNamenodes(credentials, new Path[] { binaryConfPath }, conf);
+    // Add Staging dir creds to the list of session credentials.
+    TokenCache.obtainTokensForNamenodes(sessionCreds, new Path[] {binaryConfPath}, conf);
+
+    // Add session specific credentials to the AM credentials.
+    amLaunchCredentials.mergeAll(sessionCreds);
 
     DataOutputBuffer dob = new DataOutputBuffer();
-    credentials.writeTokenStorageToStream(dob);
+    amLaunchCredentials.writeTokenStorageToStream(dob);
     securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
+    // Need to set credentials based on DAG and the URIs which have been set for the DAG.
+
+    if (dag != null) {
+      setupDAGCredentials(dag, sessionCreds, conf);
+    }
+
     // Setup the command to run the AM
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
@@ -308,6 +384,7 @@ public class TezClientUtils {
     Map<String, LocalResource> localResources =
         new TreeMap<String, LocalResource>();
 
+    // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials.
     if (amConfig.getLocalResources() != null) {
       localResources.putAll(amConfig.getLocalResources());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index e0cbc93..e452616 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -41,9 +42,9 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -60,6 +61,8 @@ public class TezSession {
   private YarnClient yarnClient;
   private boolean sessionStarted = false;
   private boolean sessionStopped = false;
+  /** Tokens which will be required for all DAGs submitted to this session. */
+  private Credentials sessionCredentials = new Credentials();
 
   public TezSession(String sessionName,
       ApplicationId applicationId,
@@ -86,7 +89,7 @@ public class TezSession {
 
     Map<String, LocalResource> tezJarResources =
         TezClientUtils.setupTezJarsLocalResources(
-          sessionConfig.getTezConfiguration());
+          sessionConfig.getTezConfiguration(), sessionCredentials);
 
     if (sessionConfig.getSessionResources() != null
       && !sessionConfig.getSessionResources().isEmpty()) {
@@ -103,7 +106,7 @@ public class TezSession {
           TezClientUtils.createApplicationSubmissionContext(
               sessionConfig.getTezConfiguration(), applicationId,
               null, sessionName, sessionConfig.getAMConfiguration(),
-              tezJarResources);
+              tezJarResources, sessionCredentials);
       // Set Tez Sessions to not retry on AM crashes
       appContext.setMaxAppAttempts(1);
       yarnClient.submitApplication(appContext);
@@ -137,7 +140,9 @@ public class TezSession {
         + ", sessionName=" + sessionName
         + ", applicationId=" + applicationId);
 
-    // setup env
+    // Obtain DAG specific credentials.
+    TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration());
+
     Map<String, String> environment = TezClientUtils
         .createEnvironment(sessionConfig.getYarnConfiguration());
     for (Vertex v : dag.getVertices()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8645ebcc/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 3840110..ad6cd8c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -17,11 +17,13 @@
  */
 package org.apache.tez.dag.api;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
 import org.apache.commons.collections4.BidiMap;
 
@@ -55,7 +58,9 @@ public class DAG { // FIXME rename to Topology
   final BidiMap<String, Vertex> vertices;
   final List<Edge> edges;
   final String name;
+  final List<URI> urisForCredentials = new LinkedList<URI>();
   Credentials credentials;
+  
 
   public DAG(String name) {
     this.vertices = new DualLinkedHashBidiMap<String, Vertex>();
@@ -76,12 +81,61 @@ public class DAG { // FIXME rename to Topology
     return vertices.get(vertexName);
   }
   
+  /**
+   * One of the methods that can be used to provide information about required
+   * Credentials when running on a secure cluster. A combination of this and
+   * addURIsForCredentials should be used to specify information about all
+   * credentials required by a DAG. AM specific credentials are not used when
+   * executing a DAG.
+   * 
+   * Set credentials which will be required to run this dag. This method can be
+   * used if the client has already obtained some or all of the required
+   * credentials.
+   * 
+   * @param credentials
+   * @return
+   */
   public synchronized DAG setCredentials(Credentials credentials) {
     this.credentials = credentials;
     return this;
   }
 
   @Private
+  public synchronized Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  /**
+   * One of the methods that can be used to provide information about required
+   * Credentials when running on a secure cluster. A combination of this and
+   * setCredentials should be used to specify information about all
+   * credentials required by a DAG. AM specific credentials are not used when
+   * executing a DAG.
+   * 
+   * This method can be used to specify a list of URIs for which Credentials need to be
+   * obtained so that the job can run.
+   * An incremental list of URIs can be provided by making multiple calls to the method.
+   *  
+   * @param uris a list of {@link URI}s
+   * @return the DAG instance being used
+   */
+  public synchronized DAG addURIsForCredentials(List<URI> uris) {
+    Preconditions.checkNotNull(uris, "URIs cannot be null");
+    urisForCredentials.addAll(uris);
+    return this;
+  }
+
+  /**
+   * 
+   * @return an unmodifiable list representing the URIs for which credentials
+   *         are required.
+   */
+  @Private
+  public synchronized List<URI> getURIsForCredentials() {
+    return Collections.unmodifiableList(urisForCredentials);
+  }
+  
+  @Private
   public synchronized Set<Vertex> getVertices() {
     return Collections.unmodifiableSet(this.vertices.values());
   }


Mime
View raw message