tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-768. Consolidate TokenCache, Master and related code. (sseth)
Date Thu, 30 Jan 2014 00:15:28 GMT
Updated Branches:
  refs/heads/master 8645ebccb -> 7160877af


TEZ-768. Consolidate TokenCache, Master and related code. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7160877a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7160877a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7160877a

Branch: refs/heads/master
Commit: 7160877af3c09e13b3b8b33984a867e259ae7c09
Parents: 8645ebc
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jan 29 16:15:08 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jan 29 16:15:08 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  6 +-
 .../org/apache/tez/common/TezJobConfig.java     | 10 +--
 .../org/apache/tez/common/security/Master.java  |  2 +
 .../apache/tez/common/security/TokenCache.java  | 95 +++-----------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  5 +-
 .../app/rm/container/AMContainerHelpers.java    |  2 +-
 .../dag/app/rm/container/TestAMContainer.java   |  4 +-
 .../tez/mapreduce/hadoop/DeprecatedKeys.java    |  2 -
 .../apache/tez/mapreduce/processor/MRTask.java  |  7 +-
 10 files changed, 27 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index ea88297..bd97de1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -185,7 +185,7 @@ public class TezClientUtils {
           + ". Ignoring for now. Errors may occur");
     } else {
       // Obtain credentials.
-      TokenCache.obtainTokensForNamenodes(credentials,
+      TokenCache.obtainTokensForFileSystems(credentials,
           tezJarPaths.toArray(new Path[tezJarPaths.size()]), conf);
     }
     return tezJarResources;
@@ -266,7 +266,7 @@ public class TezClientUtils {
         }
       });
       Path[] paths = Iterators.toArray(pathIter, Path.class);
-      TokenCache.obtainTokensForNamenodes(dagCredentials, paths, conf);
+      TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf);
     }
   }
 
@@ -318,7 +318,7 @@ public class TezClientUtils {
     }
 
     // Add Staging dir creds to the list of session credentials.
-    TokenCache.obtainTokensForNamenodes(sessionCreds, new Path[] {binaryConfPath}, conf);
+    TokenCache.obtainTokensForFileSystems(sessionCreds, new Path[] {binaryConfPath}, conf);
 
     // Add session specific credentials to the AM credentials.
     amLaunchCredentials.mergeAll(sessionCreds);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index ece2830..f21a55f 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -304,15 +304,7 @@ public class TezJobConfig {
 
   public static final String TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS
= 
       "tez.runtime.intermediate-input.key.secondary.comparator.class";
-  
-  // TODO This should be in DAGConfiguration
-  /* config for tracking the local file where all the credentials for the job
-   * credentials.
-   * Currently not supported.
-   */
-  @Private
-  public static final String DAG_CREDENTIALS_BINARY =  "tez.dag.credentials.binary";
-  
+
   public static final String TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED = "tez.runtime.broadcast.data-via-events.enabled";
   public static final boolean TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-api/src/main/java/org/apache/tez/common/security/Master.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/Master.java b/tez-api/src/main/java/org/apache/tez/common/security/Master.java
index 285297d..de73d10 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/Master.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/Master.java
@@ -39,6 +39,8 @@ public class Master {
     return conf.get(YarnConfiguration.RM_PRINCIPAL);
   }
 
+  
+  // This needs to go into YARN
   public static InetSocketAddress getMasterAddress(Configuration conf) {
     return conf
         .getSocketAddr(YarnConfiguration.RM_ADDRESS,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
index f8d9c4c..59472a9 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.tez.common.TezJobConfig;
 
 
 /**
@@ -69,32 +68,22 @@ public class TokenCache {
    * @param conf configuration
    * @throws IOException
    */
-  public static void obtainTokensForNamenodes(Credentials credentials,
+  public static void obtainTokensForFileSystems(Credentials credentials,
       Path[] ps, Configuration conf) throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    obtainTokensForNamenodesInternal(credentials, ps, conf);
+    obtainTokensForFileSystemsInternal(credentials, ps, conf);
   }
 
-  /**
-   * Remove jobtoken referrals which don't make sense in the context
-   * of the task execution.
-   *
-   * @param conf
-   */
-  public static void cleanUpTokenReferral(Configuration conf) {
-    conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
-  }
-
-  static void obtainTokensForNamenodesInternal(Credentials credentials,
+  static void obtainTokensForFileSystemsInternal(Credentials credentials,
       Path[] ps, Configuration conf) throws IOException {
     Set<FileSystem> fsSet = new HashSet<FileSystem>();
     for(Path p: ps) {
       fsSet.add(p.getFileSystem(conf));
     }
     for (FileSystem fs : fsSet) {
-      obtainTokensForNamenodesInternal(fs, credentials, conf);
+      obtainTokensForFileSystemsInternal(fs, credentials, conf);
     }
   }
 
@@ -106,14 +95,14 @@ public class TokenCache {
    * @param conf
    * @throws IOException
    */
-  static void obtainTokensForNamenodesInternal(FileSystem fs, 
+  static void obtainTokensForFileSystemsInternal(FileSystem fs, 
       Credentials credentials, Configuration conf) throws IOException {
+    // TODO Change this to use YARN utilities once YARN-1664 is fixed.
     String delegTokenRenewer = Master.getMasterPrincipal(conf);
     if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
       throw new IOException(
           "Can't get Master Kerberos principal for use as renewer");
     }
-    mergeBinaryTokens(credentials, conf);
 
     final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
                                                      credentials);
@@ -124,82 +113,24 @@ public class TokenCache {
     }
   }
 
-  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
-    String binaryTokenFilename =
-        conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
-    if (binaryTokenFilename != null) {
-      Credentials binary;
-      try {
-        binary = Credentials.readTokenStorageFile(
-            new Path("file:///" +  binaryTokenFilename), conf);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      // supplement existing tokens with the tokens in the binary file
-      creds.mergeAll(binary);
-    }
-  }
-  
-  /**
-   * file name used on HDFS for generated job token
-   */
-  @InterfaceAudience.Private
-  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+  private static final Text SESSION_TOKEN = new Text("SessionToken");
 
   /**
-   * conf setting for job tokens cache file name
-   */
-  @InterfaceAudience.Private
-  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
-  private static final Text JOB_TOKEN = new Text("JobToken");
-  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
-  
-  /**
-   * load job token from a file
-   * @param conf
-   * @throws IOException
-   */
-  @InterfaceAudience.Private
-  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
-  throws IOException {
-    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
-
-    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
-
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Task: Loaded jobTokenFile from: "+
-          localJobTokenFile.toUri().getPath() 
-          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
-          " Number of tokens " +  ts.numberOfTokens());
-    }
-    return ts;
-  }
-  /**
-   * store job token
+   * store session specific token
    * @param t
    */
   @InterfaceAudience.Private
-  public static void setJobToken(Token<? extends TokenIdentifier> t, 
+  public static void setSessionToken(Token<? extends TokenIdentifier> t, 
       Credentials credentials) {
-    credentials.addToken(JOB_TOKEN, t);
+    credentials.addToken(SESSION_TOKEN, t);
   }
   /**
    * 
-   * @return job token
+   * @return session token
    */
   @SuppressWarnings("unchecked")
   @InterfaceAudience.Private
-  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
-    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
-  }
-
-  @InterfaceAudience.Private
-  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
-    credentials.addSecretKey(SHUFFLE_TOKEN, key);
-  }
-
-  @InterfaceAudience.Private
-  public static byte[] getShuffleSecretKey(Credentials credentials) {
-    return getSecretKey(credentials, SHUFFLE_TOKEN);
+  public static Token<JobTokenIdentifier> getSessionToken(Credentials credentials)
{
+    return (Token<JobTokenIdentifier>) credentials.getToken(SESSION_TOKEN);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index b9af7b8..9f7455a 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -324,7 +324,7 @@ public class YarnTezDagChild {
     UserGroupInformation taskOwner =
       UserGroupInformation.createRemoteUser(tokenIdentifier);
 
-    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
     SecurityUtil.setTokenService(jobToken, address);
     taskOwner.addToken(jobToken);
     // Will jobToken change across DAGs ?

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8463dec..8d70a05 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -275,7 +275,7 @@ public class DAGAppMaster extends AbstractService {
     sessionToken = new Token<JobTokenIdentifier>(identifier,
         jobTokenSecretManager);
     sessionToken.setService(identifier.getJobId());
-    TokenCache.setJobToken(sessionToken, amTokens);
+    TokenCache.setSessionToken(sessionToken, amTokens);
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
     jobTokenSecretManager.addTokenForJob(sessionTokenUUID, sessionToken);
@@ -503,7 +503,8 @@ public class DAGAppMaster extends AbstractService {
     } else {
       dagCredentials = new Credentials();
     }
-    TokenCache.setJobToken(sessionToken, dagCredentials);
+    // TODO Does this move to the client in case of work-preserving recovery.
+    TokenCache.setSessionToken(sessionToken, dagCredentials);
 
     // create single dag
     DAG newDag =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 2a07250..7a74753 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -121,7 +121,7 @@ public class AMContainerHelpers {
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
       serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(TokenCache.getJobToken(containerCredentials)));
+          ShuffleHandler.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 58d256f..b78704a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -930,7 +930,7 @@ public class TestAMContainer {
     Token<TokenIdentifier> token3 = mock(Token.class);
     
     Credentials containerCredentials = new Credentials();
-    TokenCache.setJobToken(amGenToken, containerCredentials);
+    TokenCache.setSessionToken(amGenToken, containerCredentials);
 
     Text token1Name = new Text("tokenDag1");
     Text token3Name = new Text("tokenDag3");
@@ -1108,7 +1108,7 @@ public class TestAMContainer {
       reset(eventHandler);
       @SuppressWarnings("unchecked")
       Token<JobTokenIdentifier> jobToken = mock(Token.class);
-      TokenCache.setJobToken(jobToken, credentials);
+      TokenCache.setSessionToken(jobToken, credentials);
       amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
           new ContainerContext(localResources, credentials, new HashMap<String, String>(),
"")));
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 2a25eac..621a1b8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -185,8 +185,6 @@ public class DeprecatedKeys {
     registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
     
     registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
-
-    registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
     
     registerMRToRuntimeKeyTranslation("map.sort.class", TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7160877a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 0548544..db7aea9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -205,7 +205,7 @@ public abstract class MRTask {
     // Set it in conf, so as to be able to be used the the OutputCommitter.
 
     // Not needed. This is probably being set via the source/consumer meta
-    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
     if (jobToken != null) {
       // Will MR ever run without a job token.
       SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken
@@ -217,11 +217,6 @@ public abstract class MRTask {
 
     configureLocalDirs();
 
-    if (jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY) != null) {
-      jobConf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
-          jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY));
-    }
-
     // Set up the DistributedCache related configs
     setupDistributedCacheConfig(jobConf);
   }


Mime
View raw message