tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-384. Code cleanup: AMContainerEventLaunchRequest, AMSchedulerEventTALaunchRequest and related code. (sseth)
Date Thu, 22 Aug 2013 03:05:03 GMT
Updated Branches:
  refs/heads/master 364c1533f -> 9da92ac8d


TEZ-384. Code cleanup: AMContainerEventLaunchRequest,
AMSchedulerEventTALaunchRequest and related code. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9da92ac8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9da92ac8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9da92ac8

Branch: refs/heads/master
Commit: 9da92ac8dd36e79b16b98124bd4561fccf7ef7be
Parents: 364c153
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Aug 21 20:04:22 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Aug 21 20:04:22 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/ContainerContext.java    | 56 +++++++++++
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |  8 --
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 17 +---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 60 +++---------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 34 ++-----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 ++---
 .../app/rm/AMSchedulerEventTALaunchRequest.java | 53 +++--------
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  8 +-
 .../AMContainerEventLaunchRequest.java          | 55 ++---------
 .../app/rm/container/AMContainerHelpers.java    | 42 +++------
 .../dag/app/rm/container/AMContainerImpl.java   | 15 ++-
 .../apache/tez/dag/utils/TezEngineChildJVM.java |  3 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 99 +++++++-------------
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 53 +++++------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 12 +--
 .../tez/dag/app/rm/TestContainerReuse.java      | 19 ++--
 .../dag/app/rm/container/TestAMContainer.java   | 21 ++---
 17 files changed, 214 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
new file mode 100644
index 0000000..ab87b4e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+public class ContainerContext {
+
+  private final Map<String, LocalResource> localResources;
+  private final Credentials credentials;
+  private final Map<String, String> environment;
+  private final String javaOpts;
+
+  public ContainerContext(
+      Map<String, LocalResource> localResources, Credentials credentials,
+      Map<String, String> environment, String javaOpts) {
+    this.localResources = localResources;
+    this.credentials = credentials;
+    this.environment = environment;
+    this.javaOpts = javaOpts;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return this.localResources;
+  }
+  
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+  
+  public Map<String, String> getEnvironment() {
+    return this.environment;
+  }
+  
+  public String getJavaOpts() {
+    return this.javaOpts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 87d41b9..a27c45b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,10 +19,8 @@
 package org.apache.tez.dag.app.dag;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
@@ -111,10 +109,4 @@ public interface TaskAttempt {
   public Task getTask();
   
   public boolean getIsRescheduled();
-
-  public Map<String, LocalResource> getLocalResources();
-
-  public Map<String, String> getEnvironment();
-  
-  public String getJavaOpts();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 48c9c33..0e93421 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -303,7 +303,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private int numKilledVertices = 0;
   private boolean isUber = false;
   private DAGTerminationCause terminationCause;
-  private Credentials fsTokens;
+  private Credentials credentials;
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
 
@@ -338,7 +338,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
 
-    this.fsTokens = fsTokenCredentials;
+    this.credentials = fsTokenCredentials;
     this.jobTokenSecretManager = jobTokenSecretManager;
 
     this.aclsManager = new ApplicationACLsManager(conf);
@@ -915,7 +915,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       return new VertexImpl(
           vertexId, vertexPlan, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener,
-          dag.jobToken, dag.fsTokens, dag.clock,
+          dag.credentials, dag.clock,
           dag.taskHeartbeatHandler, dag.appContext,
           vertexLocationHint);
     }
@@ -965,15 +965,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       LOG.info("Adding job token for " + dagIdString
           + " to jobTokenSecretManager");
 
-      // Upload the jobTokens onto the remote FS so that ContainerManager can
-      // localize it to be used by the Containers(tasks)
-      Credentials tokenStorage = new Credentials();
-      // TODO Consider sending the jobToken over RPC.
-      TokenCache.setJobToken(job.jobToken, tokenStorage);
-
-      if (UserGroupInformation.isSecurityEnabled()) {
-        tokenStorage.addAll(job.fsTokens);
-      }
+      // Populate the jobToken into job credentials.
+      TokenCache.setJobToken(job.jobToken, job.credentials);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 05274da..ae3c05f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,12 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -61,6 +57,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -97,7 +94,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -123,8 +119,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   private final Lock writeLock;
   protected final AppContext appContext;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
-  private Credentials credentials;
-  protected Token<JobTokenIdentifier> jobToken;
   private long launchTime = 0;
   private long finishTime = 0;
   // TEZ-347 remove this and getShufflePort()
@@ -146,11 +140,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   Set<String> taskRacks = new HashSet<String>();
 
   protected final TaskLocationHint locationHint;
-  protected final Resource taskResource;
-  protected final Map<String, LocalResource> localResources;
-  protected final Map<String, String> environment;
-  protected final String javaOpts;
   protected final boolean isRescheduled;
+  private final Resource taskResource;
+  private final ContainerContext containerContext;
 
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
@@ -257,14 +249,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   // TODO Remove TaskAttemptListener from the constructor.
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener tal,
-      Configuration conf,
-      Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
+      TaskAttemptListener tal, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
-      TaskLocationHint locationHint,
-      Resource resource, Map<String, LocalResource> localResources,
-      Map<String, String> environment,
-      String javaOpts, boolean isRescheduled) {
+      TaskLocationHint locationHint, boolean isRescheduled,
+      Resource resource, ContainerContext containerContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -272,21 +260,17 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.eventHandler = eventHandler;
     //Reported status
     this.conf = conf;
-    this.jobToken = jobToken;
-    this.credentials = credentials;
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
-    this.taskResource = resource;
     this.reportedStatus = new TaskAttemptStatus();
     initTaskAttemptStatus(reportedStatus);
     RackResolver.init(conf);
     this.stateMachine = stateMachineFactory.make(this);
     this.locationHint = locationHint;
-    this.localResources = localResources;
-    this.environment = environment;
-    this.javaOpts = javaOpts;
     this.isRescheduled = isRescheduled;
+    this.taskResource = resource;
+    this.containerContext = containerContext;
   }
 
 
@@ -908,14 +892,10 @@ public class TaskAttemptImpl implements TaskAttempt,
             + remoteTaskContext);
       }
       // Send out a launch request to the scheduler.
-      AMSchedulerEventTALaunchRequest launchRequestEvent =
-          new AMSchedulerEventTALaunchRequest(ta.attemptId,
-              ta.taskResource,
-              ta.localResources, remoteTaskContext, ta,
-              ta.credentials, ta.jobToken, requestHosts,
-              requestRacks,
-              scheduleEvent.getPriority(), ta.environment, //ta.javaOpts,
-              ta.conf);
+
+      AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
+          ta.attemptId, ta.taskResource, remoteTaskContext, ta, requestHosts,
+          requestRacks, scheduleEvent.getPriority(), ta.containerContext);
       ta.sendEvent(launchRequestEvent);
     }
   }
@@ -1299,20 +1279,4 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String toString() {
     return getID().toString();
   }
-
-  @Override
-  public Map<String, LocalResource> getLocalResources() {
-    return this.localResources;
-  }
-
-  @Override
-  public Map<String, String> getEnvironment() {
-    return this.environment;
-  }
-
-  @Override
-  public String getJavaOpts() {
-	return this.javaOpts;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4fcb30d..a43453c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -29,9 +29,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -50,6 +47,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
@@ -73,7 +71,6 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -99,15 +96,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // TODO Metrics
   //private final MRAppMetrics metrics;
   protected final AppContext appContext;
+  private final Resource taskResource;
+  private final ContainerContext containerContext;
   private long scheduledTime;
 
   protected boolean encryptedShuffle;
-  protected Credentials credentials;
-  protected Token<JobTokenIdentifier> jobToken;
   protected TaskLocationHint locationHint;
-  protected Resource taskResource;
-  protected Map<String, LocalResource> localResources;
-  protected Map<String, String> environment;
 
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
@@ -261,8 +255,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private final boolean leafVertex;
 
-  protected String javaOpts;
-
   @Override
   public TaskState getState() {
     readLock.lock();
@@ -276,13 +268,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public TaskImpl(TezVertexID vertexId, int taskIndex,
       EventHandler eventHandler, Configuration conf,
       TaskAttemptListener taskAttemptListener,
-      Token<JobTokenIdentifier> jobToken,
-      Credentials credentials, Clock clock,
-      TaskHeartbeatHandler thh, AppContext appContext,
+      Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
       boolean leafVertex, TaskLocationHint locationHint, Resource resource,
-      Map<String, LocalResource> localResources,
-      Map<String, String> environment,
-      String javaOpts) {
+      ContainerContext containerContext) {
     this.conf = conf;
     this.clock = clock;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -295,17 +283,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
-    this.credentials = credentials;
-    this.jobToken = jobToken;
     this.appContext = appContext;
     this.encryptedShuffle = false;
 
     this.leafVertex = leafVertex;
     this.locationHint = locationHint;
     this.taskResource = resource;
-    this.localResources = localResources;
-    this.environment = environment;
-    this.javaOpts = javaOpts;
+    this.containerContext = containerContext;
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -564,10 +548,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
-        taskAttemptListener, conf,
-        jobToken, credentials, clock, taskHeartbeatHandler,
-        appContext, locationHint, taskResource,
-        localResources, environment, javaOpts, (failedAttempts>0));
+        taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
+        locationHint, (failedAttempts > 0), taskResource, containerContext);
   }
 
   protected TaskAttempt getSuccessfulAttempt() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1bd2e63..c0137e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -70,15 +69,16 @@ import org.apache.tez.dag.api.committer.VertexOutputCommitter;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
@@ -103,7 +103,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -333,8 +332,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private long finishTime;
   private float progress;
 
-  private Credentials fsTokens;
-  private Token<JobTokenIdentifier> jobToken;
+  private Credentials credentials;
 
   private final TezVertexID vertexId;  //runtime assigned id.
   private final VertexPlan vertexPlan;
@@ -356,13 +354,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
   private final String javaOpts;
+  private final ContainerContext containerContext;
   private VertexTerminationCause terminationCause;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
-      Token<JobTokenIdentifier> jobToken,
-      Credentials fsTokenCredentials, Clock clock,
+      Credentials credentials, Clock clock,
       // TODO: Recovery
       //Map<TaskId, TaskInfo> completedTasksFromPreviousRun,
       // TODO Metrics
@@ -386,8 +384,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
 
-    this.fsTokens = fsTokenCredentials;
-    this.jobToken = jobToken;
+    this.credentials = credentials;
     this.committer = new NullVertexOutputCommitter();
     this.vertexLocationHint = vertexLocationHint;
 
@@ -409,6 +406,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         this.processorDescriptor.getUserPayload(), this.vertexId,
         getApplicationAttemptId());
 
+    this.containerContext = new ContainerContext(this.localResources,
+        this.credentials, this.environment, this.javaOpts);
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -1022,16 +1021,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 vertex.eventHandler,
                 conf,
                 vertex.taskAttemptListener,
-                vertex.jobToken,
-                vertex.fsTokens,
                 vertex.clock,
                 vertex.taskHeartbeatHandler,
                 vertex.appContext,
                 vertex.targetVertices.isEmpty(),
                 locHint, vertex.taskResource,
-                vertex.localResources,
-                vertex.environment,
-                vertex.javaOpts);
+                vertex.containerContext);
         vertex.addTask(task);
         if(LOG.isDebugEnabled()) {
           LOG.debug("Created task for vertex " + vertex.getVertexId() + ": " +

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 21c6035..f66c1a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -17,18 +17,12 @@
 
 package org.apache.tez.dag.app.rm;
 
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 
 public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
 
@@ -36,38 +30,29 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
 
   private final TezTaskAttemptID attemptId;
-  private final Resource capability;
-  private final Map<String, LocalResource> localResources;
-  private final TezTaskContext remoteTaskContext;
-  private final TaskAttempt taskAttempt;
-  private final Credentials credentials;
-  private Token<JobTokenIdentifier> jobToken;
   private final String[] hosts;
   private final String[] racks;
   private final Priority priority;
-  private final Map<String, String> environment;
-  private final Configuration conf;
+  private final Resource capability;
+  private final ContainerContext containerContext;
+  
+
+  private final TezTaskContext remoteTaskContext;
+  private final TaskAttempt taskAttempt;
 
   public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
       Resource capability,
-      Map<String, LocalResource> localResources,
       TezTaskContext remoteTaskContext, TaskAttempt ta,
-      Credentials credentials, Token<JobTokenIdentifier> jobToken,
-      String[] hosts, String[] racks, Priority priority,
-      Map<String, String> environment, Configuration conf) {
+      String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) {
     super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
     this.attemptId = attemptId;
     this.capability = capability;
-    this.localResources = localResources;
     this.remoteTaskContext = remoteTaskContext;
     this.taskAttempt = ta;
-    this.credentials = credentials;
-    this.jobToken = jobToken;
     this.hosts = hosts;
     this.racks = racks;
     this.priority = priority;
-    this.environment = environment;
-    this.conf = conf;
+    this.containerContext = containerContext;
   }
 
   public TezTaskAttemptID getAttemptID() {
@@ -98,24 +83,8 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     return this.taskAttempt;
   }
 
-  public Credentials getCredentials() {
-    return this.credentials;
-  }
-
-  public Token<JobTokenIdentifier> getJobToken() {
-    return this.jobToken;
-  }
-
-  public Map<String, LocalResource> getLocalResources() {
-    return this.localResources;
-  }
-
-  public Map<String, String> getEnvironment() {
-    return this.environment;
-  }
-
-  public Configuration getConf() {
-    return this.conf;
+  public ContainerContext getContainerContext() {
+    return this.containerContext;
   }
 
   // Parameter replacement: @taskid@ will not be usable

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index ea67d45..1cb6de6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -395,12 +395,8 @@ public class TaskSchedulerEventHandler extends AbstractService
       sendEvent(new AMContainerEventLaunchRequest(
           containerId,
           taskAttempt.getVertexID(),
-          event.getJobToken(),
-          // TODO getConf from AMSchedulerEventTALaunchRequest
-          event.getCredentials(), false, event.getConf(),
-          taskAttempt.getLocalResources(),
-          taskAttempt.getEnvironment(),
-          taskAttempt.getJavaOpts()));
+          false,
+          event.getContainerContext()));
     }
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index 293339c..f97863a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -18,43 +18,24 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 
 public class AMContainerEventLaunchRequest extends AMContainerEvent {
 
   private final TezVertexID vertexId;
-  private final Token<JobTokenIdentifier> jobToken;
-  private final Credentials credentials;
   private final boolean shouldProfile;
-  private final Configuration conf;
-  private final Map<String, LocalResource> localResources;
-  private final Map<String, String> environment;
-  private final String javaOpts;
+  private final ContainerContext containerContext;
 
   public AMContainerEventLaunchRequest(ContainerId containerId,
-      TezVertexID vertexId,
-      Token<JobTokenIdentifier> jobToken,
-      Credentials credentials, boolean shouldProfile, Configuration conf,
-      Map<String, LocalResource> localResources,
-      Map<String, String> environment, String javaOpts) {
+      TezVertexID vertexId, boolean shouldProfile,
+      ContainerContext containerContext) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.vertexId = vertexId;
-    this.jobToken = jobToken;
-    this.credentials = credentials;
     this.shouldProfile = shouldProfile;
-    this.conf = conf;
-    this.localResources = localResources;
-    this.environment = environment;
-    this.javaOpts = javaOpts;
+    this.containerContext = containerContext;
   }
 
   public TezDAGID getDAGId() {
@@ -65,31 +46,11 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
     return this.vertexId;
   }
 
-  public Token<JobTokenIdentifier> getJobToken() {
-    return this.jobToken;
-  }
-
-  public Credentials getCredentials() {
-    return this.credentials;
-  }
-
   public boolean shouldProfile() {
     return this.shouldProfile;
   }
-
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  public Map<String, LocalResource> getLocalResources() {
-    return localResources;
-  }
-
-  public Map<String, String> getEnvironment() {
-    return environment;
-  }
-
-  public String getJavaOpts() {
-	  return javaOpts;
+  
+  public ContainerContext getContainerContext() {
+    return this.containerContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 5651d48..1855dbd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -23,34 +23,28 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezEngineChildJVM;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 
@@ -85,9 +79,8 @@ public class AMContainerHelpers {
    * @param applicationACLs
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
-      Token<JobTokenIdentifier> jobToken,
-      TezVertexID vertexId, Credentials credentials, AppContext appContext) {
+      Map<ApplicationAccessType, String> applicationACLs,
+      Credentials credentials) {
 
     // Application resources
     Map<String, LocalResource> localResources =
@@ -107,16 +100,11 @@ public class AMContainerHelpers {
       // Setup up task credentials buffer
       Credentials taskCredentials = new Credentials();
 
-      if (UserGroupInformation.isSecurityEnabled()) {
-        LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
-            + credentials.numberOfSecretKeys()
-            + " secret keys for NM use for launching container");
-        taskCredentials.addAll(credentials);
-      }
-
-      // LocalStorageToken is needed irrespective of whether security is enabled
-      // or not.
-      TokenCache.setJobToken(jobToken, taskCredentials);
+      // Add tokens if they exist.
+      LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+          + credentials.numberOfSecretKeys()
+          + " secret keys for NM use for launching container");
+      taskCredentials.addAll(credentials);
 
       DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
       LOG.info("Size of containertokens_dob is "
@@ -128,7 +116,8 @@ public class AMContainerHelpers {
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
       serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-          ShuffleHandler.serializeServiceData(jobToken));
+          ShuffleHandler.serializeServiceData(TokenCache
+              .getJobToken(taskCredentials)));
 
     } catch (IOException e) {
       throw new TezUncheckedException(e);
@@ -146,9 +135,8 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> acls,
-      ContainerId containerId, Configuration conf, TezVertexID vertexId,
-      Token<JobTokenIdentifier> jobToken,
-      Resource assignedCapability, Map<String, LocalResource> localResources,
+      ContainerId containerId,
+      Map<String, LocalResource> localResources,
       Map<String, String> vertexEnv,
       String javaOpts,
       TaskAttemptListener taskAttemptListener, Credentials credentials,
@@ -157,7 +145,7 @@ public class AMContainerHelpers {
     synchronized (commonContainerSpecLock) {
       if (commonContainerSpec == null) {
         commonContainerSpec = createCommonContainerLaunchContext(
-            acls, conf, jobToken, vertexId, credentials, appContext);
+            acls, credentials);
       }
     }
 
@@ -176,7 +164,7 @@ public class AMContainerHelpers {
 
     // Set up the launch command
     List<String> commands = TezEngineChildJVM.getVMCommand(
-        taskAttemptListener.getAddress(), conf, containerId.toString(),
+        taskAttemptListener.getAddress(), containerId.toString(),
         appContext.getApplicationID().toString(),
         appContext.getApplicationAttemptId().getAttemptId(),
         shouldProfile, javaOpts);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index f37d0fc..9d44c11 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
@@ -337,17 +338,15 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
+      ContainerContext containerContext = event.getContainerContext();
 
       container.clc = AMContainerHelpers.createContainerLaunchContext(
           container.appContext.getApplicationACLs(),
-          container.getContainerId(), event.getConf(),
-          event.getVertexId(),
-          event.getJobToken(),
-          container.getContainer().getResource(),
-          event.getLocalResources(),
-          event.getEnvironment(),
-          event.getJavaOpts(),
-          container.taskAttemptListener, event.getCredentials(),
+          container.getContainerId(),
+          containerContext.getLocalResources(),
+          containerContext.getEnvironment(),
+          containerContext.getJavaOpts(),
+          container.taskAttemptListener, containerContext.getCredentials(),
           event.shouldProfile(), container.appContext);
 
       // Registering now, so that in case of delayed NM response, the child

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index b1dd93d..8919698 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Vector;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.YarnTezDagChild;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -66,7 +65,7 @@ public class TezEngineChildJVM {
   }
 
   public static List<String> getVMCommand(
-      InetSocketAddress taskAttemptListenerAddr, Configuration conf,
+      InetSocketAddress taskAttemptListenerAddr,
       String containerIdentifier,
       String tokenIdentifier,
       int applicationAttemptNumber,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index b2f7cbd..3274a4a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -62,6 +61,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -83,7 +83,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -129,12 +128,9 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), new Configuration(),
-        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        locationHint, Resource.newInstance(1024, 1),
-        new HashMap<String, LocalResource>(), new HashMap<String, String>(),
-        "", false);
+        locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext());
 
     TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
 
@@ -176,11 +172,10 @@ public class TestTaskAttempt {
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(),
-        mock(Token.class), new Credentials(), new SystemClock(),
-        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        locationHint, Resource.newInstance(1024, 1),
-        new HashMap<String, LocalResource>(), new HashMap<String, String>(),
-        "", false);
+        new SystemClock(), mock(TaskHeartbeatHandler.class),
+        mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
+            1), createFakeContainerContext());
+        
     TaskAttemptImpl spyTa = spy(taImpl);
     when(spyTa.resolveHosts(hosts)).thenReturn(
         resolved.toArray(new String[3]));
@@ -318,18 +313,14 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     AppContext mockAppContext = mock(AppContext.class);
     doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
+        resource, createFakeContainerContext());
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -372,9 +363,6 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -393,10 +381,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
@@ -467,9 +454,6 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -488,10 +472,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext());
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
     // At state STARTING.
@@ -532,9 +515,6 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -553,10 +533,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext());
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
     // At state STARTING.
@@ -599,9 +578,6 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -620,10 +596,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
@@ -692,10 +667,6 @@ public class TestTaskAttempt {
     TaskLocationHint locationHint = new TaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
-    Map<String, String> environment = new HashMap<String, String>();
-    String javaOpts = "";
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
     ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -714,10 +685,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, mock(Token.class), new Credentials(),
-        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        locationHint, resource, localResources,
-        environment, javaOpts, false);
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext());
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
     // At state STARTING.
@@ -786,16 +756,13 @@ public class TestTaskAttempt {
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal,
-        Configuration conf, Token<JobTokenIdentifier> jobToken,
-        Credentials credentials, Clock clock,
+        Configuration conf, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
-        TaskLocationHint locationHint,
-        Resource resource, Map<String, LocalResource> localResources,
-        Map<String, String> environment, String javaOpts, boolean isRescheduled) {
+        TaskLocationHint locationHint,  boolean isRescheduled,
+        Resource resource, ContainerContext containerContext) {
       super(taskId, attemptNumber, eventHandler, tal, conf,
-          jobToken, credentials, clock, taskHeartbeatHandler, appContext,
-          locationHint, resource, localResources, environment,
-          javaOpts, isRescheduled);
+          clock, taskHeartbeatHandler, appContext,
+          locationHint, isRescheduled, resource, containerContext);
     }
 
     @Override
@@ -818,6 +785,10 @@ public class TestTaskAttempt {
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
         TaskAttemptState state) {
     }
-
+  }
+  
+  private static ContainerContext createFakeContainerContext() {
+    return new ContainerContext(new HashMap<String, LocalResource>(),
+        new Credentials(), new HashMap<String, String>(), "");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 35b6e7d..ad36b7b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -33,7 +33,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -45,10 +44,11 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -58,7 +58,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -74,7 +73,6 @@ public class TestTaskImpl {
   private Configuration conf;
   private TaskAttemptListener taskAttemptListener;
   private TaskHeartbeatHandler taskHeartbeatHandler;
-  private Token<JobTokenIdentifier> jobToken;
   private Credentials credentials;
   private Clock clock;
   private TaskLocationHint locationHint;
@@ -88,17 +86,16 @@ public class TestTaskImpl {
   private Map<String, String> environment;
   private String javaOpts;
   private boolean leafVertex;
+  private ContainerContext containerContext;
 
   private MockTaskImpl mockTask;
 
-  @SuppressWarnings("unchecked")
   @Before
   public void setup() {
     dispatcher = new InlineDispatcher();
     conf = new Configuration();
     taskAttemptListener = mock(TaskAttemptListener.class);
     taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
-    jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
     credentials = null;
     clock = new SystemClock();
     locationHint = new TaskLocationHint(null, null);
@@ -112,12 +109,14 @@ public class TestTaskImpl {
     environment = new HashMap<String, String>();
     javaOpts = "";
     leafVertex = false;
+    containerContext = new ContainerContext(localResources, credentials,
+        environment, javaOpts);
     Vertex vertex = mock(Vertex.class);
 
     mockTask = new MockTaskImpl(vertexId, partition,
-        dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken,
-        credentials, clock, taskHeartbeatHandler, appContext, leafVertex,
-        locationHint, taskResource, localResources, environment, javaOpts, vertex);
+        dispatcher.getEventHandler(), conf, taskAttemptListener, clock,
+        taskHeartbeatHandler, appContext, leafVertex, locationHint,
+        taskResource, containerContext, vertex);
   }
 
   private TezTaskID getNewTaskID() {
@@ -376,17 +375,13 @@ public class TestTaskImpl {
 
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
-        TaskAttemptListener taskAttemptListener,
-        Token<JobTokenIdentifier> jobToken, Credentials credentials,
-        Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
-        boolean leafVertex,
+        TaskAttemptListener taskAttemptListener, Clock clock,
+        TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
         TaskLocationHint locationHint, Resource resource,
-        Map<String, LocalResource> localResources,
-        Map<String, String> environment, String javaOpts, Vertex vertex) {
+        ContainerContext containerContext, Vertex vertex) {
       super(vertexId, partition, eventHandler, conf, taskAttemptListener,
-          jobToken, credentials, clock, thh, appContext,
-          leafVertex, locationHint, resource, localResources, environment,
-          javaOpts);
+          clock, thh, appContext, leafVertex, locationHint, resource,
+          containerContext);
       this.vertex = vertex;
     }
 
@@ -394,9 +389,8 @@ public class TestTaskImpl {
     protected TaskAttemptImpl createAttempt(int attemptNumber) {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
           attemptNumber, eventHandler, taskAttemptListener,
-          conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext,
-          locationHint, taskResource, localResources,
-          environment, javaOpts, true);
+          conf, clock, taskHeartbeatHandler, appContext,
+          locationHint, true, taskResource, containerContext);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -437,17 +431,12 @@ public class TestTaskImpl {
     private TaskAttemptState state = TaskAttemptState.NEW;
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
-        EventHandler eventHandler, TaskAttemptListener tal, 
-        Configuration conf, Token<JobTokenIdentifier> jobToken,
-        Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
-        AppContext appContext,
-        TaskLocationHint locationHing, Resource resource,
-        Map<String, LocalResource> localResources,
-        Map<String, String> environment, String javaOpts, boolean isRescheduled) {
-      super(taskId, attemptNumber, eventHandler, tal, conf,
-          jobToken, credentials, clock, thh, appContext,
-          locationHing, resource, localResources, environment, javaOpts,
-          isRescheduled);
+        EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
+        Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
+        TaskLocationHint locationHing, boolean isRescheduled,
+        Resource resource, ContainerContext containerContext) {
+      super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
+          appContext, locationHing, isRescheduled, resource, containerContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6022a6d..b11aaab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.MRVertexOutputCommitter;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,8 +63,8 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -75,15 +74,14 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent.Status;
 import org.junit.After;
@@ -104,7 +102,6 @@ public class TestVertexImpl {
   private DrainDispatcher dispatcher;
   private TaskAttemptListener taskAttemptListener;
   private Credentials fsTokens;
-  private Token<JobTokenIdentifier> jobToken;
   private Clock clock = new SystemClock();
   private TaskHeartbeatHandler thh;
   private AppContext appContext;
@@ -459,7 +456,7 @@ public class TestVertexImpl {
       String vName = vPlan.getName();
       TezVertexID vertexId = new TezVertexID(dagId, i+1);
       VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
-          dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens,
+          dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
           clock, thh, appContext, vertexLocationHint);
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -514,7 +511,6 @@ public class TestVertexImpl {
     dagPlan = createTestDAGPlan();
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
-    jobToken = new Token<JobTokenIdentifier>();
     appContext = mock(AppContext.class);
     DAG dag = mock(DAG.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
@@ -1193,7 +1189,7 @@ public class TestVertexImpl {
     TezVertexID vId = new TezVertexID(invalidDagId, 1);
     VertexPlan vPlan = dPlan.getVertex(0);
     VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
-        dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens,
+        dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
         clock, thh, appContext, vertexLocationHint);
     v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
     Assert.assertEquals(VertexState.FAILED, v.getState());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 7d1b22f..40b4c17 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
@@ -478,15 +479,15 @@ public class TestContainerReuse {
   private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(
       TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts,
       String[] racks, Priority priority, Configuration conf) {
-    AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(
-        taID, capability, new HashMap<String, LocalResource>(),
-        new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
-            new ProcessorDescriptor("processorClassName"),
-            Collections.singletonList(new InputSpec("vertexName", 1,
-                "inputClassName")), Collections.singletonList(new OutputSpec(
-                "vertexName", 1, "outputClassName"))), ta,
-        new Credentials(), null, hosts, racks, priority,
-        new HashMap<String, String>(), conf);
+
+    ContainerContext containerContext = 
+        new ContainerContext(new HashMap<String, LocalResource>(),
+            new Credentials(), new HashMap<String, String>(), "");
+    AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
+        new ProcessorDescriptor("processorClassName"),
+        Collections.singletonList(new InputSpec("vertexName", 1,
+            "inputClassName")), Collections.singletonList(new OutputSpec(
+            "vertexName", 1, "outputClassName"))), ta, hosts, racks, priority, containerContext);
     return lr;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index b9e907a..2d2945e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -37,7 +37,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
@@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -825,11 +826,8 @@ public class TestAMContainer {
 
     TezTaskContext tezTaskContext;
 
-    Token<JobTokenIdentifier> jobToken;
-
     public AMContainerImpl amContainer;
 
-    @SuppressWarnings("unchecked")
     public WrappedContainer() {
       applicationID = ApplicationId.newInstance(rmIdentifier, 1);
       appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
@@ -864,9 +862,6 @@ public class TestAMContainer {
       tezTaskContext = mock(TezTaskContext.class);
       doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
 
-
-      jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
-
       amContainer = new AMContainerImpl(container, chh, tal,
           appContext);
     }
@@ -896,10 +891,14 @@ public class TestAMContainer {
 
     public void launchContainer() {
       reset(eventHandler);
-      amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
-          jobToken, new Credentials(), false, new Configuration(),
-          new HashMap<String, LocalResource>(), new HashMap<String, String>(),
-          null));
+      Credentials credentials = new Credentials();
+      @SuppressWarnings("unchecked")
+      Token<JobTokenIdentifier> jobToken = mock(Token.class);
+      TokenCache.setJobToken(jobToken, credentials);
+      amContainer.handle(new AMContainerEventLaunchRequest(containerID,
+          vertexID, false, new ContainerContext(
+              new HashMap<String, LocalResource>(), credentials,
+              new HashMap<String, String>(), "")));
     }
 
     public void assignTaskAttempt(TezTaskAttemptID taID) {


Mime
View raw message