Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1A02517823 for ; Thu, 5 Mar 2015 02:20:15 +0000 (UTC) Received: (qmail 21789 invoked by uid 500); 5 Mar 2015 02:20:15 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 21695 invoked by uid 500); 5 Mar 2015 02:20:15 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 21332 invoked by uid 99); 5 Mar 2015 02:20:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 02:20:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70E16E0FBF; Thu, 5 Mar 2015 02:20:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 05 Mar 2015 02:20:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/14] tez git commit: TEZ-2006. Task communication plane needs to be pluggable. (sseth) TEZ-2006. Task communication plane needs to be pluggable. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a3f202f0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a3f202f0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a3f202f0 Branch: refs/heads/TEZ-2003 Commit: a3f202f0d67fe16e5d525e0aab269542be5d03bb Parents: 01422fd Author: Siddharth Seth Authored: Thu Feb 12 11:25:45 2015 -0800 Committer: Siddharth Seth Committed: Wed Mar 4 18:19:42 2015 -0800 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicator.java | 54 ++ .../tez/dag/api/TaskCommunicatorContext.java | 48 ++ .../tez/dag/api/TaskHeartbeatRequest.java | 63 +++ .../tez/dag/api/TaskHeartbeatResponse.java | 39 ++ .../java/org/apache/tez/dag/app/AppContext.java | 3 + .../org/apache/tez/dag/app/DAGAppMaster.java | 5 + .../dag/app/TaskAttemptListenerImpTezDag.java | 501 +++++++------------ .../tez/dag/app/TezTaskCommunicatorImpl.java | 474 ++++++++++++++++++ .../app/launcher/LocalContainerLauncher.java | 10 +- .../tez/dag/app/rm/container/AMContainer.java | 3 +- .../rm/container/AMContainerEventAssignTA.java | 2 + .../dag/app/rm/container/AMContainerImpl.java | 1 + .../apache/tez/dag/app/MockDAGAppMaster.java | 7 +- .../app/TestTaskAttemptListenerImplTezDag.java | 81 +-- 15 files changed, 945 insertions(+), 347 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 1822fcb..d7e4be5 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -1,4 +1,5 @@ ALL CHANGES: TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. + TEZ-2006. Task communication plane needs to be pluggable. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java new file mode 100644 index 0000000..97f9c16 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; + +// TODO TEZ-2003 Move this into the tez-api module +public abstract class TaskCommunicator extends AbstractService { + public TaskCommunicator(String name) { + super(name); + } + + // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. + // TODO When talking to an external service, this plugin implementer may need access to a host:port + public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); + + // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. + public abstract void registerContainerEnd(ContainerId containerId); + + // TODO TEZ-2003 TaskSpec breakup into a clean interface + // TODO TEZ-2003 Add support for priority + public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map additionalResources, + Credentials credentials, + boolean credentialsChanged); + + // TODO TEZ-2003 Remove reference to TaskAttemptID + public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID); + + // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM. + public abstract InetSocketAddress getAddress(); + + // TODO Eventually. Add methods here to support preemption of tasks. +} http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java new file mode 100644 index 0000000..9b2d889 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.io.IOException; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.records.TezTaskAttemptID; + + +// Do not make calls into this from within a held lock. + +// TODO TEZ-2003 Move this into the tez-api module +public interface TaskCommunicatorContext { + + // TODO TEZ-2003 Add signalling back into this to indicate errors - e.g. Container unregsitered, task no longer running, etc. + + // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. + + ApplicationAttemptId getApplicationAttemptId(); + Credentials getCredentials(); + + // TODO TEZ-2003 Move to vertex, taskIndex, version + boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + + TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; + + boolean isKnownContainer(ContainerId containerId); + + // TODO TEZ-2003 Move to vertex, taskIndex, version + void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId); + + // TODO Eventually Add methods to report availability stats to the scheduler. +} http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java new file mode 100644 index 0000000..f6bc8f0 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.util.List; + +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezEvent; + +// TODO TEZ-2003 Move this into the tez-api module +public class TaskHeartbeatRequest { + + // TODO TEZ-2003 Ideally containerIdentifier should not be part of the request. + // Replace with a task lookup - vertex name + task index + private final String containerIdentifier; + // TODO TEZ-2003 Get rid of the task attemptId reference if possible + private final TezTaskAttemptID taskAttemptId; + private final List events; + private final int startIndex; + private final int maxEvents; + + + public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List events, int startIndex, + int maxEvents) { + this.containerIdentifier = containerIdentifier; + this.taskAttemptId = taskAttemptId; + this.events = events; + this.startIndex = startIndex; + this.maxEvents = maxEvents; + } + + public String getContainerIdentifier() { + return containerIdentifier; + } + + public TezTaskAttemptID getTaskAttemptId() { + return taskAttemptId; + } + + public List getEvents() { + return events; + } + + public int getStartIndex() { + return startIndex; + } + + public int getMaxEvents() { + return maxEvents; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java new file mode 100644 index 0000000..c82a743 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.util.List; + +import org.apache.tez.runtime.api.impl.TezEvent; + +// TODO TEZ-2003 Move this into the tez-api module +public class TaskHeartbeatResponse { + + private final boolean shouldDie; + private List events; + + public TaskHeartbeatResponse(boolean shouldDie, List events) { + this.shouldDie = shouldDie; + this.events = events; + } + + public boolean isShouldDie() { + return shouldDie; + } + + public List getEvents() { + return events; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 5564809..f22b17a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -102,4 +103,6 @@ public interface AppContext { String[] getLocalDirs(); String getAMUser(); + + Credentials getAppCredentials(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index b347908..0b03d0a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1379,6 +1379,11 @@ public class DAGAppMaster extends AbstractService { } @Override + public Credentials getAppCredentials() { + return amCredentials; + } + + @Override public Map getApplicationACLs() { if (getServiceState() != STATE.STARTED) { throw new TezUncheckedException( http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 4cb5e99..08b50ba 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -18,228 +18,209 @@ package org.apache.tez.dag.app; import java.io.IOException; -import java.net.InetAddress; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; -import org.apache.tez.common.TezConverterUtils; -import org.apache.tez.common.TezLocalResource; -import org.apache.tez.common.TezTaskUmbilicalProtocol; -import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TaskHeartbeatRequest; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.rm.TaskSchedulerService; import org.apache.tez.dag.app.rm.container.AMContainerTask; -import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.common.security.JobTokenSecretManager; -import com.google.common.collect.Maps; @SuppressWarnings("unchecked") +@InterfaceAudience.Private public class TaskAttemptListenerImpTezDag extends AbstractService implements - TezTaskUmbilicalProtocol, TaskAttemptListener { - - private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask( - null, true, null, null, false); + TaskAttemptListener, TaskCommunicatorContext { private static final Log LOG = LogFactory .getLog(TaskAttemptListenerImpTezDag.class); private final AppContext context; + private TaskCommunicator taskCommunicator; protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; - private final JobTokenSecretManager jobTokenSecretManager; - private InetSocketAddress address; - private Server server; - - static class ContainerInfo { - ContainerInfo() { - this.lastReponse = null; - this.lastRequestId = 0; - this.amContainerTask = null; - this.taskPulled = false; + + private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null); + + private final ConcurrentMap registeredAttempts = + new ConcurrentHashMap(); + private final ConcurrentMap registeredContainers = + new ConcurrentHashMap(); + + // Defined primarily to work around ConcurrentMaps not accepting null values + private static final class ContainerInfo { + TezTaskAttemptID taskAttemptId; + ContainerInfo(TezTaskAttemptID taskAttemptId) { + this.taskAttemptId = taskAttemptId; } - long lastRequestId; - TezHeartbeatResponse lastReponse; - AMContainerTask amContainerTask; - boolean taskPulled; } - private ConcurrentMap attemptToInfoMap = - new ConcurrentHashMap(); + private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null); - private ConcurrentHashMap registeredContainers = - new ConcurrentHashMap(); public TaskAttemptListenerImpTezDag(AppContext context, - TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - JobTokenSecretManager jobTokenSecretManager) { + TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, + // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager. + JobTokenSecretManager jobTokenSecretManager) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; - this.jobTokenSecretManager = jobTokenSecretManager; this.taskHeartbeatHandler = thh; this.containerHeartbeatHandler = chh; + this.taskCommunicator = new TezTaskCommunicatorImpl(this); } @Override - public void serviceStart() { - startRpcServer(); - } - - protected void startRpcServer() { - Configuration conf = getConfig(); - if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) { - try { - server = new RPC.Builder(conf) - .setProtocol(TezTaskUmbilicalProtocol.class) - .setBindAddress("0.0.0.0") - .setPort(0) - .setInstance(this) - .setNumHandlers( - conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, - TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) - .setSecretManager(jobTokenSecretManager).build(); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { - refreshServiceAcls(conf, new TezAMPolicyProvider()); - } - - server.start(); - this.address = NetUtils.getConnectAddress(server); - } catch (IOException e) { - throw new TezUncheckedException(e); - } + public void serviceInit(Configuration conf) { + String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS); + if (taskCommClassName == null) { + LOG.info("Using Default Task Communicator"); + this.taskCommunicator = new TezTaskCommunicatorImpl(this); } else { + LOG.info("Using TaskCommunicator: " + taskCommClassName); + Class taskCommClazz = (Class) ReflectionUtils + .getClazz(taskCommClassName); try { - this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0); - } catch (UnknownHostException e) { + Constructor ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); + ctor.setAccessible(true); + this.taskCommunicator = ctor.newInstance(this); + } catch (NoSuchMethodException e) { + throw new TezUncheckedException(e); + } catch (InvocationTargetException e) { + throw new TezUncheckedException(e); + } catch (InstantiationException e) { + throw new TezUncheckedException(e); + } catch (IllegalAccessException e) { throw new TezUncheckedException(e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); } } } - void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + @Override + public void serviceStart() { + taskCommunicator.init(getConfig()); + taskCommunicator.start(); } @Override public void serviceStop() { - stopRpcServer(); - } - - protected void stopRpcServer() { - if (server != null) { - server.stop(); + if (taskCommunicator != null) { + taskCommunicator.stop(); + taskCommunicator = null; } } - public InetSocketAddress getAddress() { - return address; - } - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - return versionID; + public ApplicationAttemptId getApplicationAttemptId() { + return context.getApplicationAttemptId(); } @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return ProtocolSignature.getProtocolSignature(this, protocol, - clientVersion, clientMethodsHash); + public Credentials getCredentials() { + return context.getAppCredentials(); } @Override - public ContainerTask getTask(ContainerContext containerContext) - throws IOException { + public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) + throws IOException, TezException { + ContainerId containerId = ConverterUtils.toContainerId(request + .getContainerIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from container" + + ", request=" + request); + } - ContainerTask task = null; + if (!registeredContainers.containsKey(containerId)) { + LOG.warn("Received task heartbeat from unknown container with id: " + containerId + + ", asking it to die"); + return RESPONSE_SHOULD_DIE; + } - if (containerContext == null || containerContext.getContainerIdentifier() == null) { - LOG.info("Invalid task request with an empty containerContext or containerId"); - task = TASK_FOR_INVALID_JVM; - } else { - ContainerId containerId = ConverterUtils.toContainerId(containerContext - .getContainerIdentifier()); + // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container + // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed, + // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system + // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events). + // So - avoiding synchronization. + + pingContainerHeartbeatHandler(containerId); + List outEvents = null; + TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); + if (taskAttemptID != null) { + ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); + if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { + // This can happen when a task heartbeats. Meanwhile the container is unregistered. + // The information will eventually make it through to the plugin via a corresponding unregister. + // There's a race in that case between the unregister making it through, and this method returning. + // TODO TEZ-2003. An exception back is likely a better approach than sending a shouldDie = true, + // so that the plugin can handle the scenario. Alternately augment the response with error codes. + // Error codes would be better than exceptions. + LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats"); + return RESPONSE_SHOULD_DIE; + } + + List inEvents = request.getEvents(); if (LOG.isDebugEnabled()) { - LOG.debug("Container with id: " + containerId + " asked for a task"); + LOG.debug("Ping from " + taskAttemptID.toString() + + " events: " + (inEvents != null ? inEvents.size() : -1)); } - if (!registeredContainers.containsKey(containerId)) { - if(context.getAllContainers().get(containerId) == null) { - LOG.info("Container with id: " + containerId - + " is invalid and will be killed"); - } else { - LOG.info("Container with id: " + containerId - + " is valid, but no longer registered, and will be killed"); - } - task = TASK_FOR_INVALID_JVM; - } else { - pingContainerHeartbeatHandler(containerId); - task = getContainerTask(containerId); - if (task == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("No task current assigned to Container with id: " + containerId); - } - } else if (task == TASK_FOR_INVALID_JVM) { - LOG.info("Container with id: " + containerId - + " is valid, but no longer registered, and will be killed. Race condition."); - } else { - context.getEventHandler().handle( - new TaskAttemptEventStartedRemotely(task.getTaskSpec() - .getTaskAttemptID(), containerId, context - .getApplicationACLs())); - LOG.info("Container with id: " + containerId + " given task: " - + task.getTaskSpec().getTaskAttemptID()); - } + if (inEvents != null && !inEvents.isEmpty()) { + TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); + context.getEventHandler().handle( + new VertexEventRouteEvent(vertexId, inEvents)); } + taskHeartbeatHandler.pinged(taskAttemptID); + outEvents = context + .getCurrentDAG() + .getVertex(taskAttemptID.getTaskID().getVertexID()) + .getTask(taskAttemptID.getTaskID()) + .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), + request.getMaxEvents()); } - if (LOG.isDebugEnabled()) { - LOG.debug("getTask returning task: " + task); - } - return task; + return new TaskHeartbeatResponse(false, outEvents); + } + + @Override + public boolean isKnownContainer(ContainerId containerId) { + return context.getAllContainers().get(containerId) != null; + } + + @Override + public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { + context.getEventHandler().handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); + pingContainerHeartbeatHandler(containerId); } /** * Child checking whether it can commit. - * + *

*
* Repeatedly polls the ApplicationMaster whether it * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the @@ -262,72 +243,91 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override - public void unregisterTaskAttempt(TezTaskAttemptID attemptId) { - ContainerId containerId = attemptToInfoMap.get(attemptId); - if(containerId == null) { - LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); - return; - } - ContainerInfo containerInfo = registeredContainers.get(containerId); - if(containerInfo == null) { - LOG.warn("Unregister task attempt: " + attemptId + - " from non-registered container: " + containerId); - return; - } - synchronized (containerInfo) { - containerInfo.amContainerTask = null; - attemptToInfoMap.remove(attemptId); - } - + public InetSocketAddress getAddress() { + return taskCommunicator.getAddress(); } + // The TaskAttemptListener register / unregister methods in this class are not thread safe. + // The Tez framework should not invoke these methods from multiple threads. @Override public void registerRunningContainer(ContainerId containerId) { if (LOG.isDebugEnabled()) { - LOG.debug("ContainerId: " + containerId - + " registered with TaskAttemptListener"); + LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener"); } - ContainerInfo oldInfo = registeredContainers.put(containerId, new ContainerInfo()); - if(oldInfo != null) { + ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO); + if (oldInfo != null) { throw new TezUncheckedException( "Multiple registrations for containerId: " + containerId); } + NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); + taskCommunicator.registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort()); + } + + @Override + public void unregisterRunningContainer(ContainerId containerId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); + } + ContainerInfo containerInfo = registeredContainers.remove(containerId); + if (containerInfo.taskAttemptId != null) { + registeredAttempts.remove(containerInfo.taskAttemptId); + } + taskCommunicator.registerContainerEnd(containerId); } @Override public void registerTaskAttempt(AMContainerTask amContainerTask, - ContainerId containerId) { + ContainerId containerId) { ContainerInfo containerInfo = registeredContainers.get(containerId); - if(containerInfo == null) { + if (containerInfo == null) { throw new TezUncheckedException("Registering task attempt: " + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId); } - synchronized (containerInfo) { - if(containerInfo.amContainerTask != null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId - + " with existing assignment to: " + containerInfo.amContainerTask.getTask().getTaskAttemptID()); - } - containerInfo.amContainerTask = amContainerTask; - containerInfo.taskPulled = false; - - ContainerId containerIdFromMap = - attemptToInfoMap.put(amContainerTask.getTask().getTaskAttemptID(), containerId); - if(containerIdFromMap != null) { - throw new TezUncheckedException("Registering task attempt: " - + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId - + " when already assigned to: " + containerIdFromMap); - } + if (containerInfo.taskAttemptId != null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + + " with existing assignment to: " + + containerInfo.taskAttemptId); + } + + if (containerInfo.taskAttemptId != null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + + " with existing assignment to: " + + containerInfo.taskAttemptId); } + + // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. + registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID())); + + ContainerId containerIdFromMap = registeredAttempts.put( + amContainerTask.getTask().getTaskAttemptID(), containerId); + if (containerIdFromMap != null) { + throw new TezUncheckedException("Registering task attempt: " + + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + + " when already assigned to: " + containerIdFromMap); + } + taskCommunicator.registerRunningTaskAttempt(containerId, amContainerTask.getTask(), + amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), + amContainerTask.haveCredentialsChanged()); } @Override - public void unregisterRunningContainer(ContainerId containerId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Unregistering Container from TaskAttemptListener: " - + containerId); + public void unregisterTaskAttempt(TezTaskAttemptID attemptId) { + ContainerId containerId = registeredAttempts.remove(attemptId); + if (containerId == null) { + LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); + return; + } + ContainerInfo containerInfo = registeredContainers.get(containerId); + if (containerInfo == null) { + LOG.warn("Unregister task attempt: " + attemptId + + " from non-registered container: " + containerId); + return; } - registeredContainers.remove(containerId); + // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. + registeredContainers.put(containerId, NULL_CONTAINER_INFO); + taskCommunicator.unregisterRunningTaskAttempt(attemptId); } private void pingContainerHeartbeatHandler(ContainerId containerId) { @@ -335,7 +335,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) { - ContainerId containerId = attemptToInfoMap.get(taskAttemptId); + ContainerId containerId = registeredAttempts.get(taskAttemptId); if (containerId != null) { containerHeartbeatHandler.pinged(containerId); } else { @@ -344,124 +344,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } - @Override - public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) - throws IOException, TezException { - ContainerId containerId = ConverterUtils.toContainerId(request - .getContainerIdentifier()); - long requestId = request.getRequestId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from container" - + ", request=" + request); - } - - ContainerInfo containerInfo = registeredContainers.get(containerId); - if(containerInfo == null) { - LOG.warn("Received task heartbeat from unknown container with id: " + containerId + - ", asking it to die"); - TezHeartbeatResponse response = new TezHeartbeatResponse(); - response.setLastRequestId(requestId); - response.setShouldDie(); - return response; - } - - synchronized (containerInfo) { - pingContainerHeartbeatHandler(containerId); - - if(containerInfo.lastRequestId == requestId) { - LOG.warn("Old sequenceId received: " + requestId - + ", Re-sending last response to client"); - return containerInfo.lastReponse; - } - - TezHeartbeatResponse response = new TezHeartbeatResponse(); - response.setLastRequestId(requestId); - - TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID(); - if (taskAttemptID != null) { - ContainerId containerIdFromMap = attemptToInfoMap.get(taskAttemptID); - if(containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { - throw new TezException("Attempt " + taskAttemptID - + " is not recognized for heartbeat"); - } - - if(containerInfo.lastRequestId+1 != requestId) { - throw new TezException("Container " + containerId - + " has invalid request id. Expected: " - + containerInfo.lastRequestId+1 - + " and actual: " + requestId); - } - - List inEvents = request.getEvents(); - if (LOG.isDebugEnabled()) { - LOG.debug("Ping from " + taskAttemptID.toString() + - " events: " + (inEvents != null? inEvents.size() : -1)); - } - if(inEvents!=null && !inEvents.isEmpty()) { - TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID(); - context.getEventHandler().handle( - new VertexEventRouteEvent(vertexId, inEvents)); - } - taskHeartbeatHandler.pinged(taskAttemptID); - List outEvents = context - .getCurrentDAG() - .getVertex(taskAttemptID.getTaskID().getVertexID()) - .getTask(taskAttemptID.getTaskID()) - .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), - request.getMaxEvents()); - response.setEvents(outEvents); - } - containerInfo.lastRequestId = requestId; - containerInfo.lastReponse = response; - return response; - } - } - - private Map convertLocalResourceMap(Map ylrs) - throws IOException { - Map tlrs = Maps.newHashMap(); - if (ylrs != null) { - for (Entry ylrEntry : ylrs.entrySet()) { - TezLocalResource tlr; - try { - tlr = TezConverterUtils.convertYarnLocalResourceToTez(ylrEntry.getValue()); - } catch (URISyntaxException e) { - throw new IOException(e); - } - tlrs.put(ylrEntry.getKey(), tlr); - } - } - return tlrs; - } - - private ContainerTask getContainerTask(ContainerId containerId) throws IOException { - ContainerTask containerTask = null; - ContainerInfo containerInfo = registeredContainers.get(containerId); - if (containerInfo == null) { - // This can happen if an unregisterTask comes in after we've done the initial checks for - // registered containers. (Race between getTask from the container, and a potential STOP_CONTAINER - // from somewhere within the AM) - // Implies that an un-registration has taken place and the container needs to be asked to die. - LOG.info("Container with id: " + containerId - + " is valid, but no longer registered, and will be killed"); - containerTask = TASK_FOR_INVALID_JVM; - } else { - synchronized (containerInfo) { - if (containerInfo.amContainerTask != null) { - if (!containerInfo.taskPulled) { - containerInfo.taskPulled = true; - AMContainerTask amContainerTask = containerInfo.amContainerTask; - containerTask = new ContainerTask(amContainerTask.getTask(), false, - convertLocalResourceMap(amContainerTask.getAdditionalResources()), - amContainerTask.getCredentials(), amContainerTask.haveCredentialsChanged()); - } else { - containerTask = null; - } - } else { - containerTask = null; - } - } - } - return containerTask; + public TaskCommunicator getTaskCommunicator() { + return taskCommunicator; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java new file mode 100644 index 0000000..5652937 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -0,0 +1,474 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.common.*; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TaskHeartbeatRequest; +import org.apache.tez.dag.api.TaskHeartbeatResponse; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; + +@InterfaceAudience.Private +public class TezTaskCommunicatorImpl extends TaskCommunicator { + + private static final Log LOG = LogFactory.getLog(TezTaskCommunicatorImpl.class); + + private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask( + null, true, null, null, false); + + private final TaskCommunicatorContext taskCommunicatorContext; + + private final ConcurrentMap registeredContainers = + new ConcurrentHashMap(); + private final ConcurrentMap attemptToContainerMap = + new ConcurrentHashMap(); + + private final TezTaskUmbilicalProtocol taskUmbilical; + private InetSocketAddress address; + private Server server; + + private static final class ContainerInfo { + + ContainerInfo(ContainerId containerId) { + this.containerId = containerId; + } + + ContainerId containerId; + TezHeartbeatResponse lastResponse = null; + TaskSpec taskSpec = null; + long lastRequestId = 0; + Map additionalLRs = null; + Credentials credentials = null; + boolean credentialsChanged = false; + boolean taskPulled = false; + + void reset() { + taskSpec = null; + additionalLRs = null; + credentials = null; + credentialsChanged = false; + taskPulled = false; + } + } + + + + /** + * Construct the service. + */ + public TezTaskCommunicatorImpl(TaskCommunicatorContext taskCommunicatorContext) { + super(TezTaskCommunicatorImpl.class.getName()); + this.taskCommunicatorContext = taskCommunicatorContext; + this.taskUmbilical = new TezTaskUmbilicalProtocolImpl(); + } + + + @Override + public void serviceStart() { + + startRpcServer(); + } + + @Override + public void serviceStop() { + stopRpcServer(); + } + + protected void startRpcServer() { + Configuration conf = getConfig(); + if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) { + try { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + Token sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); + jobTokenSecretManager.addTokenForJob( + taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken); + + server = new RPC.Builder(conf) + .setProtocol(TezTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(taskUmbilical) + .setNumHandlers( + conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) + .setSecretManager(jobTokenSecretManager).build(); + + // Enable service authorization? + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + refreshServiceAcls(conf, new TezAMPolicyProvider()); + } + + server.start(); + this.address = NetUtils.getConnectAddress(server); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } else { + try { + this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0); + } catch (UnknownHostException e) { + throw new TezUncheckedException(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); + } + } + } + + protected void stopRpcServer() { + if (server != null) { + server.stop(); + server = null; + } + } + + private void refreshServiceAcls(Configuration configuration, + PolicyProvider policyProvider) { + this.server.refreshServiceAcl(configuration, policyProvider); + } + + @Override + public void registerRunningContainer(ContainerId containerId, String host, int port) { + ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId)); + if (oldInfo != null) { + throw new TezUncheckedException("Multiple registrations for containerId: " + containerId); + } + } + + @Override + public void registerContainerEnd(ContainerId containerId) { + ContainerInfo containerInfo = registeredContainers.remove(containerId); + if (containerInfo != null) { + synchronized(containerInfo) { + if (containerInfo.taskSpec != null && containerInfo.taskSpec.getTaskAttemptID() != null) { + attemptToContainerMap.remove(containerInfo.taskSpec.getTaskAttemptID()); + } + } + } + } + + @Override + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map additionalResources, + Credentials credentials, boolean credentialsChanged) { + + ContainerInfo containerInfo = registeredContainers.get(containerId); + Preconditions.checkNotNull(containerInfo, + "Cannot register task attempt: " + taskSpec.getTaskAttemptID() + " to unknown container: " + + containerId); + synchronized (containerInfo) { + if (containerInfo.taskSpec != null) { + throw new TezUncheckedException( + "Cannot register task: " + taskSpec.getTaskAttemptID() + " to container: " + + containerId + " , with pre-existing assignment: " + + containerInfo.taskSpec.getTaskAttemptID()); + } + containerInfo.taskSpec = taskSpec; + containerInfo.additionalLRs = additionalResources; + containerInfo.credentials = credentials; + containerInfo.credentialsChanged = credentialsChanged; + containerInfo.taskPulled = false; + + ContainerId oldId = attemptToContainerMap.putIfAbsent(new TaskAttempt(taskSpec.getTaskAttemptID()), containerId); + if (oldId != null) { + throw new TezUncheckedException( + "Attempting to register an already registered taskAttempt with id: " + + taskSpec.getTaskAttemptID() + " to containerId: " + containerId + + ". Already registered to containerId: " + oldId); + } + } + + } + + @Override + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { + TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID); + ContainerId containerId = attemptToContainerMap.remove(taskAttempt); + if(containerId == null) { + LOG.warn("Unregister task attempt: " + taskAttempt + " from unknown container"); + return; + } + ContainerInfo containerInfo = registeredContainers.get(containerId); + if (containerInfo == null) { + LOG.warn("Unregister task attempt: " + taskAttempt + + " from non-registered container: " + containerId); + return; + } + synchronized (containerInfo) { + containerInfo.reset(); + attemptToContainerMap.remove(taskAttempt); + } + } + + @Override + public InetSocketAddress getAddress() { + return address; + } + + public TezTaskUmbilicalProtocol getUmbilical() { + return this.taskUmbilical; + } + + private class TezTaskUmbilicalProtocolImpl implements TezTaskUmbilicalProtocol { + + @Override + public ContainerTask getTask(ContainerContext containerContext) throws IOException { + ContainerTask task = null; + if (containerContext == null || containerContext.getContainerIdentifier() == null) { + LOG.info("Invalid task request with an empty containerContext or containerId"); + task = TASK_FOR_INVALID_JVM; + } else { + ContainerId containerId = ConverterUtils.toContainerId(containerContext + .getContainerIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug("Container with id: " + containerId + " asked for a task"); + } + task = getContainerTask(containerId); + if (task != null && !task.shouldDie()) { + taskCommunicatorContext + .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("getTask returning task: " + task); + } + return task; + } + + @Override + public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { + return taskCommunicatorContext.canCommit(taskAttemptId); + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + ContainerId containerId = ConverterUtils.toContainerId(request.getContainerIdentifier()); + long requestId = request.getRequestId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from container" + + ", request=" + request); + } + + ContainerInfo containerInfo = registeredContainers.get(containerId); + if (containerInfo == null) { + LOG.warn("Received task heartbeat from unknown container with id: " + containerId + + ", asking it to die"); + TezHeartbeatResponse response = new TezHeartbeatResponse(); + response.setLastRequestId(requestId); + response.setShouldDie(); + return response; + } + + synchronized (containerInfo) { + if (containerInfo.lastRequestId == requestId) { + LOG.warn("Old sequenceId received: " + requestId + + ", Re-sending last response to client"); + return containerInfo.lastResponse; + } + } + + TaskHeartbeatResponse tResponse = null; + + + TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID(); + if (taskAttemptID != null) { + synchronized (containerInfo) { + ContainerId containerIdFromMap = attemptToContainerMap.get(new TaskAttempt(taskAttemptID)); + if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { + throw new TezException("Attempt " + taskAttemptID + + " is not recognized for heartbeat"); + } + + if (containerInfo.lastRequestId + 1 != requestId) { + throw new TezException("Container " + containerId + + " has invalid request id. Expected: " + + containerInfo.lastRequestId + 1 + + " and actual: " + requestId); + } + } + TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(), + request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(), + request.getMaxEvents()); + tResponse = taskCommunicatorContext.heartbeat(tRequest); + } + TezHeartbeatResponse response; + if (tResponse == null) { + response = new TezHeartbeatResponse(); + } else { + response = new TezHeartbeatResponse(tResponse.getEvents()); + } + response.setLastRequestId(requestId); + containerInfo.lastRequestId = requestId; + containerInfo.lastResponse = response; + return response; + } + + + // TODO Remove this method once we move to the Protobuf RPC engine + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + // TODO Remove this method once we move to the Protobuf RPC engine + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } + } + + private ContainerTask getContainerTask(ContainerId containerId) throws IOException { + ContainerInfo containerInfo = registeredContainers.get(containerId); + ContainerTask task = null; + if (containerInfo == null) { + if (taskCommunicatorContext.isKnownContainer(containerId)) { + LOG.info("Container with id: " + containerId + + " is valid, but no longer registered, and will be killed"); + } else { + LOG.info("Container with id: " + containerId + + " is invalid and will be killed"); + } + task = TASK_FOR_INVALID_JVM; + } else { + synchronized (containerInfo) { + if (containerInfo.taskSpec != null) { + if (!containerInfo.taskPulled) { + containerInfo.taskPulled = true; + task = constructContainerTask(containerInfo); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Task " + containerInfo.taskSpec.getTaskAttemptID() + + " already sent to container: " + containerId); + } + task = null; + } + } else { + task = null; + if (LOG.isDebugEnabled()) { + LOG.debug("No task assigned yet for running container: " + containerId); + } + } + } + } + return task; + } + + private ContainerTask constructContainerTask(ContainerInfo containerInfo) throws IOException { + return new ContainerTask(containerInfo.taskSpec, false, + convertLocalResourceMap(containerInfo.additionalLRs), containerInfo.credentials, + containerInfo.credentialsChanged); + } + + private Map convertLocalResourceMap(Map ylrs) + throws IOException { + Map tlrs = Maps.newHashMap(); + if (ylrs != null) { + for (Map.Entry ylrEntry : ylrs.entrySet()) { + TezLocalResource tlr; + try { + tlr = TezConverterUtils.convertYarnLocalResourceToTez(ylrEntry.getValue()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + tlrs.put(ylrEntry.getKey(), tlr); + } + } + return tlrs; + } + + + // Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex + private static class TaskAttempt { + // TODO TEZ-2003 Change this to work with VertexName, int id, int version + // TODO TEZ-2003 Avoid constructing this unit all over the place + private TezTaskAttemptID taskAttemptId; + + TaskAttempt(TezTaskAttemptID taskAttemptId) { + this.taskAttemptId = taskAttemptId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TaskAttempt)) { + return false; + } + + TaskAttempt that = (TaskAttempt) o; + + if (!taskAttemptId.equals(that.taskAttemptId)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return taskAttemptId.hashCode(); + } + + @Override + public String toString() { + return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}'; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 13601fe..7fc472b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -58,6 +58,8 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskAttemptListenerImpTezDag; +import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; @@ -85,7 +87,7 @@ public class LocalContainerLauncher extends AbstractService implements private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class); private final AppContext context; - private final TaskAttemptListener taskAttemptListener; + private final TezTaskUmbilicalProtocol taskUmbilicalProtocol; private final AtomicBoolean serviceStopped = new AtomicBoolean(false); private final String workingDirectory; private final Map localEnv = new HashMap(); @@ -112,7 +114,9 @@ public class LocalContainerLauncher extends AbstractService implements String workingDirectory) throws UnknownHostException { super(LocalContainerLauncher.class.getName()); this.context = context; - this.taskAttemptListener = taskAttemptListener; + TaskAttemptListenerImpTezDag taListener = (TaskAttemptListenerImpTezDag)taskAttemptListener; + TezTaskCommunicatorImpl taskComm = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(); + this.taskUmbilicalProtocol = taskComm.getUmbilical(); this.workingDirectory = workingDirectory; AuxiliaryServiceHelper.setServiceDataIntoEnv( ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); @@ -205,7 +209,7 @@ public class LocalContainerLauncher extends AbstractService implements tezChild = createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(), - (TezTaskUmbilicalProtocol) taskAttemptListener, + taskUmbilicalProtocol, TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array())); } catch (InterruptedException e) { handleLaunchFailed(e, event.getContainerId()); http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index a6b403d..0fc2e12 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -32,5 +33,5 @@ public interface AMContainer extends EventHandler{ public Container getContainer(); public List getAllTaskAttempts(); public TezTaskAttemptID getCurrentTaskAttempt(); - + } http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java index a363168..9380cc9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java @@ -27,6 +27,8 @@ import org.apache.tez.runtime.api.impl.TaskSpec; public class AMContainerEventAssignTA extends AMContainerEvent { + // TODO TEZ-2003. Add the task priority to this event. + private final TezTaskAttemptID attemptId; // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ? private final TaskSpec remoteTaskSpec; http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index f72e62a..2aebaa4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 04a47c6..cafcd04 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.client.TezApiVersionInfo; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.launcher.ContainerLauncher; @@ -86,6 +88,7 @@ public class MockDAGAppMaster extends DAGAppMaster { Map containers = Maps.newConcurrentMap(); TaskAttemptListenerImpTezDag taListener; + TezTaskCommunicatorImpl taskCommunicator; AtomicBoolean startScheduling = new AtomicBoolean(true); AtomicBoolean goFlag; @@ -126,6 +129,7 @@ public class MockDAGAppMaster extends DAGAppMaster { @Override public void serviceStart() throws Exception { taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener(); + taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(); eventHandlingThread = new Thread(this); eventHandlingThread.start(); } @@ -236,7 +240,8 @@ public class MockDAGAppMaster extends DAGAppMaster { if (cData.taId == null) { // if container is not assigned a task, ask for a task try { - ContainerTask cTask = taListener.getTask(new ContainerContext(cId.toString())); + ContainerTask cTask = + taskCommunicator.getUmbilical().getTask(new ContainerContext(cId.toString())); if (cTask == null) { continue; } http://git-wip-us.apache.org/repos/asf/tez/blob/a3f202f0/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index f0f7dc5..843f88a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -1,16 +1,16 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.tez.dag.app; @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -28,11 +29,14 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; -import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.container.AMContainerMap; @@ -55,10 +59,18 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); TaskAttemptListenerImpTezDag taskAttemptListener = - new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), + new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); + TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); + TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); TaskSpec taskSpec = mock(TaskSpec.class); @@ -69,33 +81,31 @@ public class TestTaskAttemptListenerImplTezDag { ContainerId containerId1 = createContainerId(appId, 1); - doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); - containerTask = taskAttemptListener.getTask(containerContext1); + containerTask = tezUmbilical.getTask(containerContext1); assertTrue(containerTask.shouldDie()); ContainerId containerId2 = createContainerId(appId, 2); - doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId2); ContainerContext containerContext2 = new ContainerContext(containerId2.toString()); taskAttemptListener.registerRunningContainer(containerId2); - containerTask = taskAttemptListener.getTask(containerContext2); + containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); // Valid task registered taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2); - containerTask = taskAttemptListener.getTask(containerContext2); + containerTask = tezUmbilical.getTask(containerContext2); assertFalse(containerTask.shouldDie()); assertEquals(taskSpec, containerTask.getTaskSpec()); // Task unregistered. Should respond to heartbeats taskAttemptListener.unregisterTaskAttempt(taskAttemptId); - containerTask = taskAttemptListener.getTask(containerContext2); + containerTask = tezUmbilical.getTask(containerContext2); assertNull(containerTask); // Container unregistered. Should send a shouldDie = true taskAttemptListener.unregisterRunningContainer(containerId2); - containerTask = taskAttemptListener.getTask(containerContext2); + containerTask = tezUmbilical.getTask(containerContext2); assertTrue(containerTask.shouldDie()); ContainerId containerId3 = createContainerId(appId, 3); @@ -109,7 +119,7 @@ public class TestTaskAttemptListenerImplTezDag { AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false); taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3); taskAttemptListener.unregisterRunningContainer(containerId3); - containerTask = taskAttemptListener.getTask(containerContext3); + containerTask = tezUmbilical.getTask(containerContext3); assertTrue(containerTask.shouldDie()); } @@ -125,10 +135,18 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(dag).when(appContext).getCurrentDAG(); doReturn(appAcls).when(appContext).getApplicationACLs(); doReturn(amContainerMap).when(appContext).getAllContainers(); + NodeId nodeId = NodeId.newInstance("localhost", 0); + AMContainer amContainer = mock(AMContainer.class); + Container container = mock(Container.class); + doReturn(nodeId).when(container).getNodeId(); + doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class)); + doReturn(container).when(amContainer).getContainer(); TaskAttemptListenerImpTezDag taskAttemptListener = - new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class), + new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null); + TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); + TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); TaskSpec taskSpec = mock(TaskSpec.class); @@ -139,20 +157,19 @@ public class TestTaskAttemptListenerImplTezDag { ContainerId containerId1 = createContainerId(appId, 1); - doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1); ContainerContext containerContext1 = new ContainerContext(containerId1.toString()); taskAttemptListener.registerRunningContainer(containerId1); - containerTask = taskAttemptListener.getTask(containerContext1); + containerTask = tezUmbilical.getTask(containerContext1); assertNull(containerTask); // Register task taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1); - containerTask = taskAttemptListener.getTask(containerContext1); + containerTask = tezUmbilical.getTask(containerContext1); assertFalse(containerTask.shouldDie()); assertEquals(taskSpec, containerTask.getTaskSpec()); // Try pulling again - simulates re-use pull - containerTask = taskAttemptListener.getTask(containerContext1); + containerTask = tezUmbilical.getTask(containerContext1); assertNull(containerTask); } @@ -162,13 +179,11 @@ public class TestTaskAttemptListenerImplTezDag { return containerId; } - private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag { + private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl { - public TaskAttemptListenerImplForTest(AppContext context, - TaskHeartbeatHandler thh, - ContainerHeartbeatHandler chh, - JobTokenSecretManager jobTokenSecretManager) { - super(context, thh, chh, jobTokenSecretManager); + public TezTaskCommunicatorImplForTest( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); } @Override