tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode. (sseth)
Date Fri, 16 Jan 2015 19:21:17 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2ff8c945a -> ea46f459c


TEZ-1879. Create local UGI instances for each task and the AM, when
running in LocalMode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ea46f459
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ea46f459
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ea46f459

Branch: refs/heads/master
Commit: ea46f459c0e88b0f9b0c714f8c6ac9d9a6f03c5e
Parents: 2ff8c94
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Jan 16 11:20:57 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jan 16 11:20:57 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/TezCommonUtils.java   | 14 +++++++
 .../java/org/apache/tez/client/LocalClient.java | 24 +++++------
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 43 +++++++++-----------
 .../app/launcher/LocalContainerLauncher.java    | 12 ++++--
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  8 +++-
 .../org/apache/tez/dag/app/MockLocalClient.java |  5 ++-
 .../org/apache/tez/runtime/task/TezChild.java   | 22 ++++++----
 8 files changed, 76 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 418475e..af4c60e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode.
   TEZ-1661. LocalTaskScheduler hangs when shutdown.
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
   TEZ-1951. Fix general findbugs warnings in tez-dag.

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index 685c728..fe570a5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -396,6 +397,19 @@ public class TezCommonUtils {
     return bb;
   }
 
+  public static Credentials parseCredentialsBytes(byte[] credentialsBytes) throws IOException
{
+    Credentials credentials = new Credentials();
+    DataInputBuffer dib = new DataInputBuffer();
+    try {
+      byte[] tokenBytes = credentialsBytes;
+      dib.reset(tokenBytes, tokenBytes.length);
+      credentials.readTokenStorageStream(dib);
+      return credentials;
+    } finally {
+      dib.close();
+    }
+  }
+
   public static void logCredentials(Log log, Credentials credentials, String identifier)
{
     if (log.isDebugEnabled()) {
       log.debug(getCredentialsInfo(credentials, identifier));

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 94b8474..f309b02 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -36,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -46,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Logger;
-import org.apache.tez.common.EnvironmentUpdateUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -274,17 +271,15 @@ public class LocalClient extends FrameworkClient {
           fs.mkdirs(logDir);
           fs.mkdirs(localDir);
 
+          UserGroupInformation.setConfiguration(conf);
           // Add session specific credentials to the AM credentials.
-          UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
           ByteBuffer tokens = appContext.getAMContainerSpec().getTokens();
 
+          Credentials amCredentials;
           if (tokens != null) {
-            Credentials amCredentials = new Credentials();
-            DataInputByteBuffer dibb = new DataInputByteBuffer();
-            dibb.reset(tokens);
-            amCredentials.readTokenStorageStream(dibb);
-            tokens.rewind();
-            currentUser.addCredentials(amCredentials);
+            amCredentials = TezCommonUtils.parseCredentialsBytes(tokens.array());
+          } else {
+            amCredentials = new Credentials();
           }
 
           // Construct, initialize, and start the DAGAppMaster
@@ -298,9 +293,10 @@ public class LocalClient extends FrameworkClient {
           dagAppMaster =
               createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
                   new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath(),
-                  new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()});
+                  new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
+                  amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
           clientHandler = new DAGClientHandler(dagAppMaster);
-          DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName());
+          DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
 
         } catch (Throwable t) {
           LOG.fatal("Error starting DAGAppMaster", t);
@@ -323,10 +319,10 @@ public class LocalClient extends FrameworkClient {
   protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String userDir,
-      String[] localDirs, String[] logDirs) {
+      String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName)
{
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
-        versionInfo.getVersion(), 1);
+        versionInfo.getVersion(), 1, credentials, jobUserName);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6dddf6a..e1ab3b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -240,8 +240,8 @@ public class DAGAppMaster extends AbstractService {
   private DAGClientHandler clientHandler;
 
   private DAG currentDAG;
-  private Credentials amTokens = new Credentials(); // Filled during init
-  private UserGroupInformation appMasterUgi;
+  private final Credentials amCredentials;
+  private final UserGroupInformation appMasterUgi;
 
   private AtomicBoolean sessionStopped = new AtomicBoolean(false);
   private long sessionTimeoutInterval;
@@ -283,7 +283,8 @@ public class DAGAppMaster extends AbstractService {
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
-      String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts) {
+      String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts,
+      Credentials credentials, String jobUserName) {
     super(DAGAppMaster.class.getName());
     this.clock = clock;
     this.startTime = clock.getTime();
@@ -302,6 +303,11 @@ public class DAGAppMaster extends AbstractService {
     this.dagVersionInfo = new TezDagVersionInfo();
     this.clientVersion = clientVersion;
     this.maxAppAttempts = maxAppAttempts;
+    this.amCredentials = credentials;
+    this.appMasterUgi = UserGroupInformation
+        .createRemoteUser(jobUserName);
+    this.appMasterUgi.addCredentials(amCredentials);
+
 
     // TODO Metrics
     //this.metrics = DAGAppMetrics.create();
@@ -341,8 +347,6 @@ public class DAGAppMaster extends AbstractService {
     }
 
     if (isLocal) {
-       UserGroupInformation.setConfiguration(conf);
-       appMasterUgi = UserGroupInformation.getCurrentUser();
        conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
        conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
            TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
@@ -369,7 +373,7 @@ public class DAGAppMaster extends AbstractService {
     addIfService(containerHeartbeatHandler, true);
 
     sessionToken =
-        TokenCache.getSessionToken(amTokens);
+        TokenCache.getSessionToken(amCredentials);
     if (sessionToken == null) {
       throw new RuntimeException("Could not find session token in AM Credentials");
     }
@@ -1838,6 +1842,12 @@ public class DAGAppMaster extends AbstractService {
 
       CommandLine cliParser = new GnuParser().parse(opts, args);
 
+      // TODO Does this really need to be a YarnConfiguration ?
+      Configuration conf = new Configuration(new YarnConfiguration());
+      TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()),
conf);
+      UserGroupInformation.setConfiguration(conf);
+      Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
       DAGAppMaster appMaster =
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
@@ -1846,11 +1856,11 @@ public class DAGAppMaster extends AbstractService {
               System.getenv(Environment.PWD.name()),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
-              clientVersion, maxAppAttempts);
+              clientVersion, maxAppAttempts, credentials, jobUserName);
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
-      initAndStartAppMaster(appMaster, jobUserName);
+      initAndStartAppMaster(appMaster, conf);
 
     } catch (Throwable t) {
       LOG.fatal("Error starting DAGAppMaster", t);
@@ -2004,29 +2014,18 @@ public class DAGAppMaster extends AbstractService {
     sendEvent(startDagEvent);
   }
 
-  // TODO XXX Does this really need to be a YarnConfiguration ?
   public static void initAndStartAppMaster(final DAGAppMaster appMaster,
-      String jobUserName) throws IOException,
+      final Configuration conf) throws IOException,
       InterruptedException {
 
-    final Configuration conf = new Configuration(new YarnConfiguration());
-    TezUtilsInternal.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
-
     // Do not automatically close FileSystem objects so that in case of
     // SIGTERM I have a chance to write out the job history. I'll be closing
     // the objects myself.
     conf.setBoolean("fs.automatic.close", false);
     Limits.setConfiguration(conf);
 
-    UserGroupInformation.setConfiguration(conf);
-    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    
-    appMaster.appMasterUgi = UserGroupInformation
-        .createRemoteUser(jobUserName);
-    appMaster.appMasterUgi.addCredentials(credentials);
-
     // Now remove the AM->RM token so tasks don't have it
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    Iterator<Token<?>> iter = appMaster.amCredentials.getAllTokens().iterator();
     while (iter.hasNext()) {
       Token<?> token = iter.next();
       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
@@ -2034,8 +2033,6 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-    appMaster.amTokens = credentials;
-
     appMaster.appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 0343828..2f29569 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -46,10 +46,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -203,7 +205,8 @@ public class LocalContainerLauncher extends AbstractService implements
         tezChild =
             createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
                 context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
-                (TezTaskUmbilicalProtocol) taskAttemptListener);
+                (TezTaskUmbilicalProtocol) taskAttemptListener,
+                TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
       } catch (InterruptedException e) {
         handleLaunchFailed(e, event.getContainerId());
         return;
@@ -318,7 +321,8 @@ public class LocalContainerLauncher extends AbstractService implements
 
   private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
                                   String tokenIdentifier, int attemptNumber, String[] localDirs,
-                                  TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws
+                                  TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol,
+                                  Credentials credentials) throws
       InterruptedException, TezException, IOException {
     Map<String, String> containerEnv = new HashMap<String, String>();
     containerEnv.putAll(localEnv);
@@ -326,7 +330,7 @@ public class LocalContainerLauncher extends AbstractService implements
 
     TezChild tezChild =
         TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
-            attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext);
+            attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext,
credentials);
     tezChild.setUmbilical(tezTaskUmbilicalProtocol);
     return tezChild;
   }
@@ -342,4 +346,4 @@ public class LocalContainerLauncher extends AbstractService implements
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index aca572b..d34532b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -302,9 +304,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
   public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
       String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
       boolean isSession, String workingDirectory, String[] localDirs, String[] logDirs,
-      AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag) {
+      AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag,
+      Credentials credentials, String jobUserName) {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
-        isSession, workingDirectory, localDirs, logDirs,  new TezApiVersionInfo().getVersion(),
1);
+        isSession, workingDirectory, localDirs, logDirs,  new TezApiVersionInfo().getVersion(),
1,
+        credentials, jobUserName);
     containerLauncher = new MockContainerLauncher(launcherGoFlag);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
     this.initFailFlag = initFailFlag;

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 4a2fa9b..3cb9d8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.Clock;
@@ -51,10 +52,10 @@ public class MockLocalClient extends LocalClient {
   protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String userDir,
-      String[] localDirs, String[] logDirs) {
+      String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName)
{
     mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, localDirs,
logDirs,
-        mockAppLauncherGoFlag, initFailFlag, startFailFlag);
+        mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, jobUserName);
     return mockApp;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index a71fc55..7a9b600 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -102,6 +102,7 @@ public class TezChild {
   private final ExecutionContext ExecutionContext;
   private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String,
ByteBuffer>();
   private final Map<String, String> serviceProviderEnvMap;
+  private final Credentials credentials;
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -115,7 +116,8 @@ public class TezChild {
       String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
       Map<String, String> serviceProviderEnvMap,
       ObjectRegistryImpl objectRegistry, String pid,
-      ExecutionContext ExecutionContext)
+      ExecutionContext ExecutionContext,
+      Credentials credentials)
       throws IOException, InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
@@ -125,6 +127,7 @@ public class TezChild {
     this.workingDir = workingDir;
     this.pid = pid;
     this.ExecutionContext = ExecutionContext;
+    this.credentials = credentials;
 
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -146,8 +149,7 @@ public class TezChild {
 
     this.objectRegistry = objectRegistry;
 
-    // Security framework already loaded the tokens into current ugi
-    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Executing with tokens:");
       for (Token<?> token : credentials.getAllTokens()) {
@@ -407,14 +409,12 @@ public class TezChild {
   public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
       String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
       Map<String, String> serviceProviderEnvMap, @Nullable String pid,
-      ExecutionContext ExecutionContext)
+      ExecutionContext ExecutionContext, Credentials credentials)
       throws IOException, InterruptedException, TezException {
 
     // Pull in configuration specified for the session.
     // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
     // for each and every task, and reading it back from disk. Also needs to be per vertex.
-    TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
-    UserGroupInformation.setConfiguration(conf);
     Limits.setConfiguration(conf);
 
     // Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be
independent
@@ -426,7 +426,7 @@ public class TezChild {
 
     return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry,
pid,
-        ExecutionContext);
+        ExecutionContext, credentials);
   }
 
   public static void main(String[] args) throws IOException, InterruptedException, TezException
{
@@ -451,9 +451,15 @@ public class TezChild {
           + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
           + " tokenIdentifier: " + tokenIdentifier);
     }
+
+    // Security framework already loaded the tokens into current ugi
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()),
defaultConf);
+    UserGroupInformation.setConfiguration(defaultConf);
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
     TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
         tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
-        System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
+        System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
+        credentials);
     tezChild.run();
   }
 


Mime
View raw message