Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 397CC10E0B for ; Thu, 22 Aug 2013 03:05:41 +0000 (UTC) Received: (qmail 23570 invoked by uid 500); 22 Aug 2013 03:05:40 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 23556 invoked by uid 500); 22 Aug 2013 03:05:39 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 23548 invoked by uid 99); 22 Aug 2013 03:05:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Aug 2013 03:05:36 +0000 X-ASF-Spam-Status: No, hits=-2002.8 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 22 Aug 2013 03:05:27 +0000 Received: (qmail 23405 invoked by uid 99); 22 Aug 2013 03:05:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Aug 2013 03:05:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B64A28C25D2; Thu, 22 Aug 2013 03:05:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: <3d5539dcf2bb4285ae91e563738f7b8c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-384. Code cleanup: AMContainerEventLaunchRequest, AMSchedulerEventTALaunchRequest and related code. (sseth) Date: Thu, 22 Aug 2013 03:05:03 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 364c1533f -> 9da92ac8d TEZ-384. Code cleanup: AMContainerEventLaunchRequest, AMSchedulerEventTALaunchRequest and related code. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9da92ac8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9da92ac8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9da92ac8 Branch: refs/heads/master Commit: 9da92ac8dd36e79b16b98124bd4561fccf7ef7be Parents: 364c153 Author: Siddharth Seth Authored: Wed Aug 21 20:04:22 2013 -0700 Committer: Siddharth Seth Committed: Wed Aug 21 20:04:22 2013 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/app/ContainerContext.java | 56 +++++++++++ .../org/apache/tez/dag/app/dag/TaskAttempt.java | 8 -- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 17 +--- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 60 +++--------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 34 ++----- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 ++--- .../app/rm/AMSchedulerEventTALaunchRequest.java | 53 +++-------- .../dag/app/rm/TaskSchedulerEventHandler.java | 8 +- .../AMContainerEventLaunchRequest.java | 55 ++--------- .../app/rm/container/AMContainerHelpers.java | 42 +++------ .../dag/app/rm/container/AMContainerImpl.java | 15 ++- .../apache/tez/dag/utils/TezEngineChildJVM.java | 3 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 99 +++++++------------- .../tez/dag/app/dag/impl/TestTaskImpl.java | 53 +++++------ .../tez/dag/app/dag/impl/TestVertexImpl.java | 12 +-- .../tez/dag/app/rm/TestContainerReuse.java | 19 ++-- .../dag/app/rm/container/TestAMContainer.java | 21 ++--- 17 files changed, 214 insertions(+), 364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java new file mode 100644 index 0000000..ab87b4e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.dag.app; + +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.LocalResource; + +public class ContainerContext { + + private final Map localResources; + private final Credentials credentials; + private final Map environment; + private final String javaOpts; + + public ContainerContext( + Map localResources, Credentials credentials, + Map environment, String javaOpts) { + this.localResources = localResources; + this.credentials = credentials; + this.environment = environment; + this.javaOpts = javaOpts; + } + + public Map getLocalResources() { + return this.localResources; + } + + public Credentials getCredentials() { + return this.credentials; + } + + public Map getEnvironment() { + return this.environment; + } + + public String getJavaOpts() { + return this.javaOpts; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 87d41b9..a27c45b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -19,10 +19,8 @@ package org.apache.tez.dag.app.dag; import java.util.List; -import java.util.Map; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; @@ -111,10 +109,4 @@ public interface TaskAttempt { public Task getTask(); public boolean getIsRescheduled(); - - public Map getLocalResources(); - - public Map getEnvironment(); - - public String getJavaOpts(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 48c9c33..0e93421 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -303,7 +303,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private int numKilledVertices = 0; private boolean isUber = false; private DAGTerminationCause terminationCause; - private Credentials fsTokens; + private Credentials credentials; private Token jobToken; private JobTokenSecretManager jobTokenSecretManager; @@ -338,7 +338,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); - this.fsTokens = fsTokenCredentials; + this.credentials = fsTokenCredentials; this.jobTokenSecretManager = jobTokenSecretManager; this.aclsManager = new ApplicationACLsManager(conf); @@ -915,7 +915,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return new VertexImpl( vertexId, vertexPlan, vertexName, dag.conf, dag.eventHandler, dag.taskAttemptListener, - dag.jobToken, dag.fsTokens, dag.clock, + dag.credentials, dag.clock, dag.taskHeartbeatHandler, dag.appContext, vertexLocationHint); } @@ -965,15 +965,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("Adding job token for " + dagIdString + " to jobTokenSecretManager"); - // Upload the jobTokens onto the remote FS so that ContainerManager can - // localize it to be used by the Containers(tasks) - Credentials tokenStorage = new Credentials(); - // TODO Consider sending the jobToken over RPC. - TokenCache.setJobToken(job.jobToken, tokenStorage); - - if (UserGroupInformation.isSecurityEnabled()) { - tokenStorage.addAll(job.fsTokens); - } + // Populate the jobToken into job credentials. + TokenCache.setJobToken(job.jobToken, job.credentials); } /** http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 05274da..ae3c05f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -33,12 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -61,6 +57,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; @@ -97,7 +94,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TezBuilderUtils; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import com.google.common.annotations.VisibleForTesting; @@ -123,8 +119,6 @@ public class TaskAttemptImpl implements TaskAttempt, private final Lock writeLock; protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; - private Credentials credentials; - protected Token jobToken; private long launchTime = 0; private long finishTime = 0; // TEZ-347 remove this and getShufflePort() @@ -146,11 +140,9 @@ public class TaskAttemptImpl implements TaskAttempt, Set taskRacks = new HashSet(); protected final TaskLocationHint locationHint; - protected final Resource taskResource; - protected final Map localResources; - protected final Map environment; - protected final String javaOpts; protected final boolean isRescheduled; + private final Resource taskResource; + private final ContainerContext containerContext; protected static final FailedTransitionHelper FAILED_HELPER = new FailedTransitionHelper(); @@ -257,14 +249,10 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Remove TaskAttemptListener from the constructor. @SuppressWarnings("rawtypes") public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, - TaskAttemptListener tal, - Configuration conf, - Token jobToken, Credentials credentials, Clock clock, + TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, - TaskLocationHint locationHint, - Resource resource, Map localResources, - Map environment, - String javaOpts, boolean isRescheduled) { + TaskLocationHint locationHint, boolean isRescheduled, + Resource resource, ContainerContext containerContext) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -272,21 +260,17 @@ public class TaskAttemptImpl implements TaskAttempt, this.eventHandler = eventHandler; //Reported status this.conf = conf; - this.jobToken = jobToken; - this.credentials = credentials; this.clock = clock; this.taskHeartbeatHandler = taskHeartbeatHandler; this.appContext = appContext; - this.taskResource = resource; this.reportedStatus = new TaskAttemptStatus(); initTaskAttemptStatus(reportedStatus); RackResolver.init(conf); this.stateMachine = stateMachineFactory.make(this); this.locationHint = locationHint; - this.localResources = localResources; - this.environment = environment; - this.javaOpts = javaOpts; this.isRescheduled = isRescheduled; + this.taskResource = resource; + this.containerContext = containerContext; } @@ -908,14 +892,10 @@ public class TaskAttemptImpl implements TaskAttempt, + remoteTaskContext); } // Send out a launch request to the scheduler. - AMSchedulerEventTALaunchRequest launchRequestEvent = - new AMSchedulerEventTALaunchRequest(ta.attemptId, - ta.taskResource, - ta.localResources, remoteTaskContext, ta, - ta.credentials, ta.jobToken, requestHosts, - requestRacks, - scheduleEvent.getPriority(), ta.environment, //ta.javaOpts, - ta.conf); + + AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest( + ta.attemptId, ta.taskResource, remoteTaskContext, ta, requestHosts, + requestRacks, scheduleEvent.getPriority(), ta.containerContext); ta.sendEvent(launchRequestEvent); } } @@ -1299,20 +1279,4 @@ public class TaskAttemptImpl implements TaskAttempt, public String toString() { return getID().toString(); } - - @Override - public Map getLocalResources() { - return this.localResources; - } - - @Override - public Map getEnvironment() { - return this.environment; - } - - @Override - public String getJavaOpts() { - return this.javaOpts; - } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 4fcb30d..a43453c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -29,9 +29,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -50,6 +47,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskReport; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; @@ -73,7 +71,6 @@ import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.records.TezDependentTaskCompletionEvent; import com.google.common.annotations.VisibleForTesting; @@ -99,15 +96,12 @@ public class TaskImpl implements Task, EventHandler { // TODO Metrics //private final MRAppMetrics metrics; protected final AppContext appContext; + private final Resource taskResource; + private final ContainerContext containerContext; private long scheduledTime; protected boolean encryptedShuffle; - protected Credentials credentials; - protected Token jobToken; protected TaskLocationHint locationHint; - protected Resource taskResource; - protected Map localResources; - protected Map environment; // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container @@ -261,8 +255,6 @@ public class TaskImpl implements Task, EventHandler { private final boolean leafVertex; - protected String javaOpts; - @Override public TaskState getState() { readLock.lock(); @@ -276,13 +268,9 @@ public class TaskImpl implements Task, EventHandler { public TaskImpl(TezVertexID vertexId, int taskIndex, EventHandler eventHandler, Configuration conf, TaskAttemptListener taskAttemptListener, - Token jobToken, - Credentials credentials, Clock clock, - TaskHeartbeatHandler thh, AppContext appContext, + Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, TaskLocationHint locationHint, Resource resource, - Map localResources, - Map environment, - String javaOpts) { + ContainerContext containerContext) { this.conf = conf; this.clock = clock; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -295,17 +283,13 @@ public class TaskImpl implements Task, EventHandler { this.taskAttemptListener = taskAttemptListener; this.taskHeartbeatHandler = thh; this.eventHandler = eventHandler; - this.credentials = credentials; - this.jobToken = jobToken; this.appContext = appContext; this.encryptedShuffle = false; this.leafVertex = leafVertex; this.locationHint = locationHint; this.taskResource = resource; - this.localResources = localResources; - this.environment = environment; - this.javaOpts = javaOpts; + this.containerContext = containerContext; stateMachine = stateMachineFactory.make(this); } @@ -564,10 +548,8 @@ public class TaskImpl implements Task, EventHandler { TaskAttemptImpl createAttempt(int attemptNumber) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, - taskAttemptListener, conf, - jobToken, credentials, clock, taskHeartbeatHandler, - appContext, locationHint, taskResource, - localResources, environment, javaOpts, (failedAttempts>0)); + taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, + locationHint, (failedAttempts > 0), taskResource, containerContext); } protected TaskAttempt getSuccessfulAttempt() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 1bd2e63..c0137e2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.MRVertexOutputCommitter; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -70,15 +69,16 @@ import org.apache.tez.dag.api.committer.VertexOutputCommitter; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskTerminationCause; import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.VertexScheduler; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.DAGEventType; @@ -103,7 +103,6 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.records.TezDependentTaskCompletionEvent; import com.google.common.annotations.VisibleForTesting; @@ -333,8 +332,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private long finishTime; private float progress; - private Credentials fsTokens; - private Token jobToken; + private Credentials credentials; private final TezVertexID vertexId; //runtime assigned id. private final VertexPlan vertexPlan; @@ -356,13 +354,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private Map localResources; private Map environment; private final String javaOpts; + private final ContainerContext containerContext; private VertexTerminationCause terminationCause; public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, - Token jobToken, - Credentials fsTokenCredentials, Clock clock, + Credentials credentials, Clock clock, // TODO: Recovery //Map completedTasksFromPreviousRun, // TODO Metrics @@ -386,8 +384,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); - this.fsTokens = fsTokenCredentials; - this.jobToken = jobToken; + this.credentials = credentials; this.committer = new NullVertexOutputCommitter(); this.vertexLocationHint = vertexLocationHint; @@ -409,6 +406,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.processorDescriptor.getUserPayload(), this.vertexId, getApplicationAttemptId()); + this.containerContext = new ContainerContext(this.localResources, + this.credentials, this.environment, this.javaOpts); // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); @@ -1022,16 +1021,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.eventHandler, conf, vertex.taskAttemptListener, - vertex.jobToken, - vertex.fsTokens, vertex.clock, vertex.taskHeartbeatHandler, vertex.appContext, vertex.targetVertices.isEmpty(), locHint, vertex.taskResource, - vertex.localResources, - vertex.environment, - vertex.javaOpts); + vertex.containerContext); vertex.addTask(task); if(LOG.isDebugEnabled()) { LOG.debug("Created task for vertex " + vertex.getVertexId() + ": " + http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java index 21c6035..f66c1a2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java @@ -17,18 +17,12 @@ package org.apache.tez.dag.app.rm; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.TezTaskContext; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { @@ -36,38 +30,29 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task. private final TezTaskAttemptID attemptId; - private final Resource capability; - private final Map localResources; - private final TezTaskContext remoteTaskContext; - private final TaskAttempt taskAttempt; - private final Credentials credentials; - private Token jobToken; private final String[] hosts; private final String[] racks; private final Priority priority; - private final Map environment; - private final Configuration conf; + private final Resource capability; + private final ContainerContext containerContext; + + + private final TezTaskContext remoteTaskContext; + private final TaskAttempt taskAttempt; public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId, Resource capability, - Map localResources, TezTaskContext remoteTaskContext, TaskAttempt ta, - Credentials credentials, Token jobToken, - String[] hosts, String[] racks, Priority priority, - Map environment, Configuration conf) { + String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) { super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST); this.attemptId = attemptId; this.capability = capability; - this.localResources = localResources; this.remoteTaskContext = remoteTaskContext; this.taskAttempt = ta; - this.credentials = credentials; - this.jobToken = jobToken; this.hosts = hosts; this.racks = racks; this.priority = priority; - this.environment = environment; - this.conf = conf; + this.containerContext = containerContext; } public TezTaskAttemptID getAttemptID() { @@ -98,24 +83,8 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { return this.taskAttempt; } - public Credentials getCredentials() { - return this.credentials; - } - - public Token getJobToken() { - return this.jobToken; - } - - public Map getLocalResources() { - return this.localResources; - } - - public Map getEnvironment() { - return this.environment; - } - - public Configuration getConf() { - return this.conf; + public ContainerContext getContainerContext() { + return this.containerContext; } // Parameter replacement: @taskid@ will not be usable http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index ea67d45..1cb6de6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -395,12 +395,8 @@ public class TaskSchedulerEventHandler extends AbstractService sendEvent(new AMContainerEventLaunchRequest( containerId, taskAttempt.getVertexID(), - event.getJobToken(), - // TODO getConf from AMSchedulerEventTALaunchRequest - event.getCredentials(), false, event.getConf(), - taskAttempt.getLocalResources(), - taskAttempt.getEnvironment(), - taskAttempt.getJavaOpts())); + false, + event.getContainerContext())); } sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container)); sendEvent(new AMContainerEventAssignTA(containerId, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java index 293339c..f97863a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java @@ -18,43 +18,24 @@ package org.apache.tez.dag.app.rm.container; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; public class AMContainerEventLaunchRequest extends AMContainerEvent { private final TezVertexID vertexId; - private final Token jobToken; - private final Credentials credentials; private final boolean shouldProfile; - private final Configuration conf; - private final Map localResources; - private final Map environment; - private final String javaOpts; + private final ContainerContext containerContext; public AMContainerEventLaunchRequest(ContainerId containerId, - TezVertexID vertexId, - Token jobToken, - Credentials credentials, boolean shouldProfile, Configuration conf, - Map localResources, - Map environment, String javaOpts) { + TezVertexID vertexId, boolean shouldProfile, + ContainerContext containerContext) { super(containerId, AMContainerEventType.C_LAUNCH_REQUEST); this.vertexId = vertexId; - this.jobToken = jobToken; - this.credentials = credentials; this.shouldProfile = shouldProfile; - this.conf = conf; - this.localResources = localResources; - this.environment = environment; - this.javaOpts = javaOpts; + this.containerContext = containerContext; } public TezDAGID getDAGId() { @@ -65,31 +46,11 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent { return this.vertexId; } - public Token getJobToken() { - return this.jobToken; - } - - public Credentials getCredentials() { - return this.credentials; - } - public boolean shouldProfile() { return this.shouldProfile; } - - public Configuration getConf() { - return this.conf; - } - - public Map getLocalResources() { - return localResources; - } - - public Map getEnvironment() { - return environment; - } - - public String getJavaOpts() { - return javaOpts; + + public ContainerContext getContainerContext() { + return this.containerContext; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 5651d48..1855dbd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -23,34 +23,28 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.Map.Entry; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TezEngineChildJVM; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.common.security.TokenCache; import org.apache.tez.engine.common.shuffle.server.ShuffleHandler; @@ -85,9 +79,8 @@ public class AMContainerHelpers { * @param applicationACLs */ private static ContainerLaunchContext createCommonContainerLaunchContext( - Map applicationACLs, Configuration conf, - Token jobToken, - TezVertexID vertexId, Credentials credentials, AppContext appContext) { + Map applicationACLs, + Credentials credentials) { // Application resources Map localResources = @@ -107,16 +100,11 @@ public class AMContainerHelpers { // Setup up task credentials buffer Credentials taskCredentials = new Credentials(); - if (UserGroupInformation.isSecurityEnabled()) { - LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #" - + credentials.numberOfSecretKeys() - + " secret keys for NM use for launching container"); - taskCredentials.addAll(credentials); - } - - // LocalStorageToken is needed irrespective of whether security is enabled - // or not. - TokenCache.setJobToken(jobToken, taskCredentials); + // Add tokens if they exist. + LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #" + + credentials.numberOfSecretKeys() + + " secret keys for NM use for launching container"); + taskCredentials.addAll(credentials); DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); LOG.info("Size of containertokens_dob is " @@ -128,7 +116,8 @@ public class AMContainerHelpers { // Add shuffle token LOG.info("Putting shuffle token in serviceData"); serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, - ShuffleHandler.serializeServiceData(jobToken)); + ShuffleHandler.serializeServiceData(TokenCache + .getJobToken(taskCredentials))); } catch (IOException e) { throw new TezUncheckedException(e); @@ -146,9 +135,8 @@ public class AMContainerHelpers { @VisibleForTesting public static ContainerLaunchContext createContainerLaunchContext( Map acls, - ContainerId containerId, Configuration conf, TezVertexID vertexId, - Token jobToken, - Resource assignedCapability, Map localResources, + ContainerId containerId, + Map localResources, Map vertexEnv, String javaOpts, TaskAttemptListener taskAttemptListener, Credentials credentials, @@ -157,7 +145,7 @@ public class AMContainerHelpers { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( - acls, conf, jobToken, vertexId, credentials, appContext); + acls, credentials); } } @@ -176,7 +164,7 @@ public class AMContainerHelpers { // Set up the launch command List commands = TezEngineChildJVM.getVMCommand( - taskAttemptListener.getAddress(), conf, containerId.toString(), + taskAttemptListener.getAddress(), containerId.toString(), appContext.getApplicationID().toString(), appContext.getApplicationAttemptId().getAttemptId(), shouldProfile, javaOpts); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index f37d0fc..9d44c11 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.tez.common.TezTaskContext; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.event.DiagnosableEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; @@ -337,17 +338,15 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent; + ContainerContext containerContext = event.getContainerContext(); container.clc = AMContainerHelpers.createContainerLaunchContext( container.appContext.getApplicationACLs(), - container.getContainerId(), event.getConf(), - event.getVertexId(), - event.getJobToken(), - container.getContainer().getResource(), - event.getLocalResources(), - event.getEnvironment(), - event.getJavaOpts(), - container.taskAttemptListener, event.getCredentials(), + container.getContainerId(), + containerContext.getLocalResources(), + containerContext.getEnvironment(), + containerContext.getJavaOpts(), + container.taskAttemptListener, containerContext.getCredentials(), event.shouldProfile(), container.appContext); // Registering now, so that in case of delayed NM response, the child http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java index b1dd93d..8919698 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java @@ -22,7 +22,6 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Vector; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.YarnTezDagChild; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -66,7 +65,7 @@ public class TezEngineChildJVM { } public static List getVMCommand( - InetSocketAddress taskAttemptListenerAddr, Configuration conf, + InetSocketAddress taskAttemptListenerAddr, String containerIdentifier, String tokenIdentifier, int applicationAttemptNumber, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index b2f7cbd..3274a4a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -62,6 +61,7 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; @@ -83,7 +83,6 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -129,12 +128,9 @@ public class TestTaskAttempt { TezTaskID taskID = new TezTaskID( new TezVertexID(new TezDAGID("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - mock(TaskAttemptListener.class), new Configuration(), - mock(Token.class), new Credentials(), new SystemClock(), + mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mock(AppContext.class), - locationHint, Resource.newInstance(1024, 1), - new HashMap(), new HashMap(), - "", false); + locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext()); TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); @@ -176,11 +172,10 @@ public class TestTaskAttempt { new TezVertexID(new TezDAGID("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskAttemptListener.class), new Configuration(), - mock(Token.class), new Credentials(), new SystemClock(), - mock(TaskHeartbeatHandler.class), mock(AppContext.class), - locationHint, Resource.newInstance(1024, 1), - new HashMap(), new HashMap(), - "", false); + new SystemClock(), mock(TaskHeartbeatHandler.class), + mock(AppContext.class), locationHint, false, Resource.newInstance(1024, + 1), createFakeContainerContext()); + TaskAttemptImpl spyTa = spy(taImpl); when(spyTa.resolveHosts(hosts)).thenReturn( resolved.toArray(new String[3])); @@ -318,18 +313,14 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; AppContext mockAppContext = mock(AppContext.class); doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false, + resource, createFakeContainerContext()); NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -372,9 +363,6 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -393,10 +381,9 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, + resource, createFakeContainerContext()); ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); @@ -467,9 +454,6 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -488,10 +472,9 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, + resource, createFakeContainerContext()); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); // At state STARTING. @@ -532,9 +515,6 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -553,10 +533,9 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, + resource, createFakeContainerContext()); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); // At state STARTING. @@ -599,9 +578,6 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -620,10 +596,9 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, + resource, createFakeContainerContext()); ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); @@ -692,10 +667,6 @@ public class TestTaskAttempt { TaskLocationHint locationHint = new TaskLocationHint( new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - Map localResources = - new HashMap(); - Map environment = new HashMap(); - String javaOpts = ""; NodeId nid = NodeId.newInstance("127.0.0.1", 0); ContainerId contId = ContainerId.newInstance(appAttemptId, 3); @@ -714,10 +685,9 @@ public class TestTaskAttempt { doReturn(containers).when(appCtx).getAllContainers(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, mock(Token.class), new Credentials(), - new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, - locationHint, resource, localResources, - environment, javaOpts, false); + taListener, taskConf, new SystemClock(), + mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, + resource, createFakeContainerContext()); taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); // At state STARTING. @@ -786,16 +756,13 @@ public class TestTaskAttempt { public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, TaskAttemptListener tal, - Configuration conf, Token jobToken, - Credentials credentials, Clock clock, + Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, - TaskLocationHint locationHint, - Resource resource, Map localResources, - Map environment, String javaOpts, boolean isRescheduled) { + TaskLocationHint locationHint, boolean isRescheduled, + Resource resource, ContainerContext containerContext) { super(taskId, attemptNumber, eventHandler, tal, conf, - jobToken, credentials, clock, taskHeartbeatHandler, appContext, - locationHint, resource, localResources, environment, - javaOpts, isRescheduled); + clock, taskHeartbeatHandler, appContext, + locationHint, isRescheduled, resource, containerContext); } @Override @@ -818,6 +785,10 @@ public class TestTaskAttempt { protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { } - + } + + private static ContainerContext createFakeContainerContext() { + return new ContainerContext(new HashMap(), + new Credentials(), new HashMap(), ""); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 35b6e7d..ad36b7b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; @@ -45,10 +44,11 @@ import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.TaskTerminationCause; import org.apache.tez.dag.app.dag.TaskStateInternal; +import org.apache.tez.dag.app.dag.TaskTerminationCause; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; @@ -58,7 +58,6 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.junit.Before; import org.junit.Test; @@ -74,7 +73,6 @@ public class TestTaskImpl { private Configuration conf; private TaskAttemptListener taskAttemptListener; private TaskHeartbeatHandler taskHeartbeatHandler; - private Token jobToken; private Credentials credentials; private Clock clock; private TaskLocationHint locationHint; @@ -88,17 +86,16 @@ public class TestTaskImpl { private Map environment; private String javaOpts; private boolean leafVertex; + private ContainerContext containerContext; private MockTaskImpl mockTask; - @SuppressWarnings("unchecked") @Before public void setup() { dispatcher = new InlineDispatcher(); conf = new Configuration(); taskAttemptListener = mock(TaskAttemptListener.class); taskHeartbeatHandler = mock(TaskHeartbeatHandler.class); - jobToken = (Token) mock(Token.class); credentials = null; clock = new SystemClock(); locationHint = new TaskLocationHint(null, null); @@ -112,12 +109,14 @@ public class TestTaskImpl { environment = new HashMap(); javaOpts = ""; leafVertex = false; + containerContext = new ContainerContext(localResources, credentials, + environment, javaOpts); Vertex vertex = mock(Vertex.class); mockTask = new MockTaskImpl(vertexId, partition, - dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken, - credentials, clock, taskHeartbeatHandler, appContext, leafVertex, - locationHint, taskResource, localResources, environment, javaOpts, vertex); + dispatcher.getEventHandler(), conf, taskAttemptListener, clock, + taskHeartbeatHandler, appContext, leafVertex, locationHint, + taskResource, containerContext, vertex); } private TezTaskID getNewTaskID() { @@ -376,17 +375,13 @@ public class TestTaskImpl { public MockTaskImpl(TezVertexID vertexId, int partition, EventHandler eventHandler, Configuration conf, - TaskAttemptListener taskAttemptListener, - Token jobToken, Credentials credentials, - Clock clock, TaskHeartbeatHandler thh, AppContext appContext, - boolean leafVertex, + TaskAttemptListener taskAttemptListener, Clock clock, + TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex, TaskLocationHint locationHint, Resource resource, - Map localResources, - Map environment, String javaOpts, Vertex vertex) { + ContainerContext containerContext, Vertex vertex) { super(vertexId, partition, eventHandler, conf, taskAttemptListener, - jobToken, credentials, clock, thh, appContext, - leafVertex, locationHint, resource, localResources, environment, - javaOpts); + clock, thh, appContext, leafVertex, locationHint, resource, + containerContext); this.vertex = vertex; } @@ -394,9 +389,8 @@ public class TestTaskImpl { protected TaskAttemptImpl createAttempt(int attemptNumber) { MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskAttemptListener, - conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext, - locationHint, taskResource, localResources, - environment, javaOpts, true); + conf, clock, taskHeartbeatHandler, appContext, + locationHint, true, taskResource, containerContext); taskAttempts.add(attempt); return attempt; } @@ -437,17 +431,12 @@ public class TestTaskImpl { private TaskAttemptState state = TaskAttemptState.NEW; public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, - EventHandler eventHandler, TaskAttemptListener tal, - Configuration conf, Token jobToken, - Credentials credentials, Clock clock, TaskHeartbeatHandler thh, - AppContext appContext, - TaskLocationHint locationHing, Resource resource, - Map localResources, - Map environment, String javaOpts, boolean isRescheduled) { - super(taskId, attemptNumber, eventHandler, tal, conf, - jobToken, credentials, clock, thh, appContext, - locationHing, resource, localResources, environment, javaOpts, - isRescheduled); + EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, + Clock clock, TaskHeartbeatHandler thh, AppContext appContext, + TaskLocationHint locationHing, boolean isRescheduled, + Resource resource, ContainerContext containerContext) { + super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, + appContext, locationHing, isRescheduled, resource, containerContext); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 6022a6d..b11aaab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.MRVertexOutputCommitter; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -64,8 +63,8 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; @@ -75,15 +74,14 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; -import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.event.VertexEventTermination; +import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.avro.HistoryEventType; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.engine.common.security.JobTokenIdentifier; import org.apache.tez.engine.records.TezDependentTaskCompletionEvent; import org.apache.tez.engine.records.TezDependentTaskCompletionEvent.Status; import org.junit.After; @@ -104,7 +102,6 @@ public class TestVertexImpl { private DrainDispatcher dispatcher; private TaskAttemptListener taskAttemptListener; private Credentials fsTokens; - private Token jobToken; private Clock clock = new SystemClock(); private TaskHeartbeatHandler thh; private AppContext appContext; @@ -459,7 +456,7 @@ public class TestVertexImpl { String vName = vPlan.getName(); TezVertexID vertexId = new TezVertexID(dagId, i+1); VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens, + dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, thh, appContext, vertexLocationHint); vertices.put(vName, v); vertexIdMap.put(vertexId, v); @@ -514,7 +511,6 @@ public class TestVertexImpl { dagPlan = createTestDAGPlan(); dispatcher = new DrainDispatcher(); fsTokens = new Credentials(); - jobToken = new Token(); appContext = mock(AppContext.class); DAG dag = mock(DAG.class); doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); @@ -1193,7 +1189,7 @@ public class TestVertexImpl { TezVertexID vId = new TezVertexID(invalidDagId, 1); VertexPlan vPlan = dPlan.getVertex(0); VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf, - dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens, + dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, thh, appContext, vertexLocationHint); v.handle(new VertexEvent(vId, VertexEventType.V_INIT)); Assert.assertEquals(VertexState.FAILED, v.getState()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 7d1b22f..40b4c17 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -38,6 +38,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest; @@ -478,15 +479,15 @@ public class TestContainerReuse { private AMSchedulerEventTALaunchRequest createLaunchRequestEvent( TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, Configuration conf) { - AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest( - taID, capability, new HashMap(), - new TezEngineTaskContext(taID, "user", "jobName", "vertexName", - new ProcessorDescriptor("processorClassName"), - Collections.singletonList(new InputSpec("vertexName", 1, - "inputClassName")), Collections.singletonList(new OutputSpec( - "vertexName", 1, "outputClassName"))), ta, - new Credentials(), null, hosts, racks, priority, - new HashMap(), conf); + + ContainerContext containerContext = + new ContainerContext(new HashMap(), + new Credentials(), new HashMap(), ""); + AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TezEngineTaskContext(taID, "user", "jobName", "vertexName", + new ProcessorDescriptor("processorClassName"), + Collections.singletonList(new InputSpec("vertexName", 1, + "inputClassName")), Collections.singletonList(new OutputSpec( + "vertexName", 1, "outputClassName"))), ta, hosts, racks, priority, containerContext); return lr; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9da92ac8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index b9e907a..2d2945e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -37,7 +37,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezTaskContext; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; @@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.engine.common.security.JobTokenIdentifier; +import org.apache.tez.engine.common.security.TokenCache; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -825,11 +826,8 @@ public class TestAMContainer { TezTaskContext tezTaskContext; - Token jobToken; - public AMContainerImpl amContainer; - @SuppressWarnings("unchecked") public WrappedContainer() { applicationID = ApplicationId.newInstance(rmIdentifier, 1); appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1); @@ -864,9 +862,6 @@ public class TestAMContainer { tezTaskContext = mock(TezTaskContext.class); doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId(); - - jobToken = (Token) mock(Token.class); - amContainer = new AMContainerImpl(container, chh, tal, appContext); } @@ -896,10 +891,14 @@ public class TestAMContainer { public void launchContainer() { reset(eventHandler); - amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, - jobToken, new Credentials(), false, new Configuration(), - new HashMap(), new HashMap(), - null)); + Credentials credentials = new Credentials(); + @SuppressWarnings("unchecked") + Token jobToken = mock(Token.class); + TokenCache.setJobToken(jobToken, credentials); + amContainer.handle(new AMContainerEventLaunchRequest(containerID, + vertexID, false, new ContainerContext( + new HashMap(), credentials, + new HashMap(), ""))); } public void assignTaskAttempt(TezTaskAttemptID taID) {