Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BDC7717388 for ; Tue, 7 Apr 2015 20:12:20 +0000 (UTC) Received: (qmail 87987 invoked by uid 500); 7 Apr 2015 20:12:20 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 87898 invoked by uid 500); 7 Apr 2015 20:12:20 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 86565 invoked by uid 99); 7 Apr 2015 20:12:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 20:12:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB715E2F1A; Tue, 7 Apr 2015 20:12:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Tue, 07 Apr 2015 20:12:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/35] tez git commit: TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth) TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/45756504 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/45756504 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/45756504 Branch: refs/heads/TEZ-2003 Commit: 4575650452b6b8bb943af7b43fc0144d222fd1c7 Parents: 6a26d15 Author: Siddharth Seth Authored: Fri Feb 20 11:59:03 2015 -0800 Committer: Siddharth Seth Committed: Tue Apr 7 13:11:44 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 4 +- .../apache/tez/dag/app/TaskAttemptListener.java | 12 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 31 ++-- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 4 +- .../TezRootInputInitializerContextImpl.java | 2 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +- .../tez/dag/app/dag/impl/VertexManager.java | 2 +- .../app/launcher/ContainerLauncherRouter.java | 2 +- .../app/launcher/LocalContainerLauncher.java | 10 +- .../rm/AMSchedulerEventDeallocateContainer.java | 7 +- .../rm/AMSchedulerEventNodeBlacklistUpdate.java | 8 +- .../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 10 +- .../dag/app/rm/LocalTaskSchedulerService.java | 19 ++- .../tez/dag/app/rm/NMCommunicatorEvent.java | 12 +- .../rm/NMCommunicatorLaunchRequestEvent.java | 11 +- .../app/rm/NMCommunicatorStopRequestEvent.java | 4 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 151 ++++++++++++----- .../tez/dag/app/rm/container/AMContainer.java | 3 + .../AMContainerEventLaunchRequest.java | 15 +- .../dag/app/rm/container/AMContainerImpl.java | 39 +++-- .../dag/app/rm/container/AMContainerMap.java | 4 +- .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 6 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../app/TestTaskAttemptListenerImplTezDag.java | 22 +-- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 69 ++++---- .../tez/dag/app/dag/impl/TestVertexImpl.java | 8 +- .../tez/dag/app/rm/TestContainerReuse.java | 34 ++-- .../tez/dag/app/rm/TestLocalTaskScheduler.java | 2 +- .../app/rm/TestLocalTaskSchedulerService.java | 18 ++- .../app/rm/TestTaskSchedulerEventHandler.java | 11 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +- .../dag/app/rm/container/TestAMContainer.java | 108 +++++++------ .../app/rm/container/TestAMContainerMap.java | 6 +- .../org/apache/tez/examples/JoinValidate.java | 30 +++- .../TezTestServiceContainerLauncher.java | 5 +- .../rm/TezTestServiceTaskSchedulerService.java | 100 ++---------- .../tez/examples/JoinValidateConfigured.java | 53 ++++++ .../tez/tests/TestExternalTezServices.java | 160 ++++++++++++++----- 39 files changed, 630 insertions(+), 359 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 4bfe08f..1a2264c 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -4,5 +4,6 @@ ALL CHANGES: TEZ-2090. Add tests for jobs running in external services. TEZ-2117. Add a manager for ContainerLaunchers running in the AM. TEZ-2122. Setup pluggable components at AM/Vertex level. + TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode) INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cce945a..04329b2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -496,7 +496,7 @@ public class DAGAppMaster extends AbstractService { this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, - taskSchedulerClassIdentifiers); + taskSchedulerClassIdentifiers, isLocal); addIfService(taskSchedulerEventHandler, true); if (enableWebUIService()) { @@ -2188,6 +2188,7 @@ public class DAGAppMaster extends AbstractService { // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT private String[] parsePlugins(BiMap pluginMap, String[] pluginStrings, String context) { + // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API. Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0, "Plugin strings should not be null or empty: " + context); @@ -2225,6 +2226,7 @@ public class DAGAppMaster extends AbstractService { } pluginMap.put(identifierString, index); classNames[index] = className; + index++; } return classNames; } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java index aeb0cd5..51f3ff7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app; import java.net.InetSocketAddress; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; /** @@ -28,14 +29,13 @@ import org.apache.tez.dag.records.TezTaskAttemptID; */ public interface TaskAttemptListener { - InetSocketAddress getAddress(); + void registerRunningContainer(ContainerId containerId, int taskCommId); - void registerRunningContainer(ContainerId containerId); - - void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId); + void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId); - void unregisterRunningContainer(ContainerId containerId); + void unregisterRunningContainer(ContainerId containerId, int taskCommId); - void unregisterTaskAttempt(TezTaskAttemptID attemptID); + void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId); + TaskCommunicator getTaskCommunicator(int taskCommIndex); } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index a465447..d8d186a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -251,15 +251,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements return task.canCommit(taskAttemptId); } - @Override - public InetSocketAddress getAddress() { - return taskCommunicators[0].getAddress(); - } - // The TaskAttemptListener register / unregister methods in this class are not thread safe. // The Tez framework should not invoke these methods from multiple threads. @Override - public void registerRunningContainer(ContainerId containerId) { + public void registerRunningContainer(ContainerId containerId, int taskCommId) { if (LOG.isDebugEnabled()) { LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener"); } @@ -269,11 +264,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements "Multiple registrations for containerId: " + containerId); } NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId(); - taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort()); + taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), + nodeId.getPort()); } @Override - public void unregisterRunningContainer(ContainerId containerId) { + public void unregisterRunningContainer(ContainerId containerId, int taskCommId) { if (LOG.isDebugEnabled()) { LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId); } @@ -281,12 +277,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements if (containerInfo.taskAttemptId != null) { registeredAttempts.remove(containerInfo.taskAttemptId); } - taskCommunicators[0].registerContainerEnd(containerId); + taskCommunicators[taskCommId].registerContainerEnd(containerId); } @Override public void registerTaskAttempt(AMContainerTask amContainerTask, - ContainerId containerId) { + ContainerId containerId, int taskCommId) { ContainerInfo containerInfo = registeredContainers.get(containerId); if (containerInfo == null) { throw new TezUncheckedException("Registering task attempt: " @@ -316,13 +312,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + containerIdFromMap); } - taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), + taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), amContainerTask.haveCredentialsChanged()); } @Override - public void unregisterTaskAttempt(TezTaskAttemptID attemptId) { + public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) { ContainerId containerId = registeredAttempts.remove(attemptId); if (containerId == null) { LOG.warn("Unregister task attempt: " + attemptId + " from unknown container"); @@ -336,7 +332,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map. registeredContainers.put(containerId, NULL_CONTAINER_INFO); - taskCommunicators[0].unregisterRunningTaskAttempt(attemptId); + taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId); + } + + @Override + public TaskCommunicator getTaskCommunicator(int taskCommIndex) { + return taskCommunicators[taskCommIndex]; } private void pingContainerHeartbeatHandler(ContainerId containerId) { @@ -352,8 +353,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements + ", ContainerId not known for this attempt"); } } - - public TaskCommunicator getTaskCommunicator() { - return taskCommunicators[0]; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e4705ea..e1851c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1207,7 +1207,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the scheduler if (sendSchedulerEvent()) { ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper - .getTaskAttemptState())); + .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier())); } } } @@ -1288,7 +1288,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the Scheduler. ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, - TaskAttemptState.SUCCEEDED)); + TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java index d4ef4d5..4ca4024 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java @@ -96,7 +96,7 @@ public class TezRootInputInitializerContextImpl implements @Override public Resource getTotalAvailableResource() { - return appContext.getTaskScheduler().getTotalResources(); + return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier()); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 1b7a415..33d6b49 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -4016,7 +4016,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, eventHandler, getTotalTasks(), appContext.getTaskScheduler().getNumClusterNodes(), getTaskResource(), - appContext.getTaskScheduler().getTotalResources()); + appContext.getTaskScheduler().getTotalResources(taskSchedulerIdentifier)); List> inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size()); for (String inputName : inputsWithInitializers) { http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 95d714b..e4d5489 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -243,7 +243,7 @@ public class VertexManager { @Override public synchronized Resource getTotalAvailableResource() { checkAndThrowIfDone(); - return appContext.getTaskScheduler().getTotalResources(); + return appContext.getTaskScheduler().getTotalResources(managedVertex.getTaskSchedulerIdentifier()); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 621e4a8..4f9b5bf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -124,6 +124,6 @@ public class ContainerLauncherRouter extends AbstractService @Override public void handle(NMCommunicatorEvent event) { - containerLaunchers[0].handle(event); + containerLaunchers[event.getLauncherId()].handle(event); } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index b019875..6db48cb 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -58,7 +58,6 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.TaskAttemptListenerImpTezDag; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; @@ -87,9 +86,9 @@ public class LocalContainerLauncher extends AbstractService implements private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class); private final AppContext context; - private final TezTaskUmbilicalProtocol taskUmbilicalProtocol; private final AtomicBoolean serviceStopped = new AtomicBoolean(false); private final String workingDirectory; + private final TaskAttemptListener tal; private final Map localEnv = new HashMap(); private final ExecutionContext executionContext; @@ -114,9 +113,8 @@ public class LocalContainerLauncher extends AbstractService implements String workingDirectory) throws UnknownHostException { super(LocalContainerLauncher.class.getName()); this.context = context; - TaskAttemptListenerImpTezDag taListener = (TaskAttemptListenerImpTezDag)taskAttemptListener; - TezTaskCommunicatorImpl taskComm = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(); - this.taskUmbilicalProtocol = taskComm.getUmbilical(); + this.tal = taskAttemptListener; + this.workingDirectory = workingDirectory; AuxiliaryServiceHelper.setServiceDataIntoEnv( ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); @@ -209,7 +207,7 @@ public class LocalContainerLauncher extends AbstractService implements tezChild = createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier, context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(), - taskUmbilicalProtocol, + ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(), TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array())); } catch (InterruptedException e) { handleLaunchFailed(e, event.getContainerId()); http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java index 1b51920..5270aa2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java @@ -23,15 +23,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId; public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent { private final ContainerId containerId; + private final int schedulerId; - public AMSchedulerEventDeallocateContainer(ContainerId containerId) { + public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) { super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE); this.containerId = containerId; + this.schedulerId = schedulerId; } public ContainerId getContainerId() { return this.containerId; } + public int getSchedulerId() { + return schedulerId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java index ed7ebc3..679705a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java @@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.records.NodeId; public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent { private final NodeId nodeId; + private final int schedulerId; - public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) { + public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) { super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED : AMSchedulerEventType.S_NODE_UNBLACKLISTED)); this.nodeId = nodeId; + this.schedulerId = schedulerId; } public NodeId getNodeId() { return this.nodeId; } + + public int getSchedulerId() { + return schedulerId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java index 90e76b7..2ace642 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java @@ -26,14 +26,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { private final TaskAttempt attempt; private final ContainerId containerId; - private TaskAttemptState state; + private final TaskAttemptState state; + private final int schedulerId; public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId, - TaskAttemptState state) { + TaskAttemptState state, int schedulerId) { super(AMSchedulerEventType.S_TA_ENDED); this.attempt = attempt; this.containerId = containerId; this.state = state; + this.schedulerId = schedulerId; } public TezTaskAttemptID getAttemptID() { @@ -51,4 +53,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { public ContainerId getUsedContainerId() { return this.containerId; } + + public int getSchedulerId() { + return schedulerId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index ce01bfb..ecb7ad7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -63,10 +64,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { final int appHostPort; final String appTrackingUrl; final AppContext appContext; + final long customContainerAppId; public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient, ContainerSignatureMatcher containerSignatureMatcher, String appHostName, - int appHostPort, String appTrackingUrl, AppContext appContext) { + int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) { super(LocalTaskSchedulerService.class.getName()); this.realAppClient = appClient; this.appCallbackExecutor = createAppCallbackExecutorService(); @@ -78,6 +80,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { this.appContext = appContext; taskRequestQueue = new PriorityBlockingQueue(); taskAllocations = new LinkedHashMap(); + this.customContainerAppId = customContainerAppId; } private ExecutorService createAppCallbackExecutorService() { @@ -164,7 +167,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { return new AsyncDelegateRequestHandler(taskRequestQueue, - new LocalContainerFactory(appContext), + new LocalContainerFactory(appContext, customContainerAppId), taskAllocations, appClientDelegate, conf); @@ -195,17 +198,19 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { } static class LocalContainerFactory { - final AppContext appContext; AtomicInteger nextId; + final ApplicationAttemptId customAppAttemptId; - public LocalContainerFactory(AppContext appContext) { - this.appContext = appContext; + public LocalContainerFactory(AppContext appContext, long appIdLong) { this.nextId = new AtomicInteger(1); + ApplicationId appId = ApplicationId + .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + this.customAppAttemptId = ApplicationAttemptId + .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); } public Container createContainer(Resource capability, Priority priority) { - ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId(); - ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement()); + ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance("127.0.0.1", 0); String nodeHttpAddress = "127.0.0.1:0"; http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java index 8bdeb28..f86894f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java @@ -28,13 +28,15 @@ public class NMCommunicatorEvent extends AbstractEvent private final ContainerId containerId; private final NodeId nodeId; private final Token containerToken; + private final int launcherId; public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId, - Token containerToken, NMCommunicatorEventType type) { + Token containerToken, NMCommunicatorEventType type, int launcherId) { super(type); this.containerId = containerId; this.nodeId = nodeId; this.containerToken = containerToken; + this.launcherId = launcherId; } public ContainerId getContainerId() { @@ -48,10 +50,14 @@ public class NMCommunicatorEvent extends AbstractEvent public Token getContainerToken() { return this.containerToken; } - + + public int getLauncherId() { + return launcherId; + } + public String toSrting() { return super.toString() + " for container " + containerId + ", nodeId: " - + nodeId; + + nodeId + ", launcherId: " + launcherId; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java index c3b12c0..a38345c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java @@ -25,13 +25,16 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent { private final ContainerLaunchContext clc; private final Container container; + // The task communicator index for the specific container being launched. + private final int taskCommId; public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc, - Container container) { + Container container, int launcherId, int taskCommId) { super(container.getId(), container.getNodeId(), container - .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST); + .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId); this.clc = clc; this.container = container; + this.taskCommId = taskCommId; } public ContainerLaunchContext getContainerLaunchContext() { @@ -42,6 +45,10 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent { return container; } + public int getTaskCommId() { + return taskCommId; + } + @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java index 277d1e7..c9b5c44 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token; public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent { public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId, - Token containerToken) { + Token containerToken, int launcherId) { super(containerId, nodeId, containerToken, - NMCommunicatorEventType.CONTAINER_STOP_REQUEST); + NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId); } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index caa41df..b69c81a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -110,9 +110,22 @@ public class TaskSchedulerEventHandler extends AbstractService private final String[] taskSchedulerClasses; protected final TaskSchedulerService []taskSchedulers; + private final boolean isPureLocalMode; + // If running in non local-only mode, the YARN task scheduler will always run to take care of + // registration with YARN and heartbeats to YARN. + // Splitting registration and heartbeats is not straigh-forward due to the taskScheduler being + // tied to a ContainerRequestType. + private final int yarnTaskSchedulerIndex; + // Custom AppIds to avoid container conflicts if there's multiple sources + private final long SCHEDULER_APP_ID_BASE = 111101111; + private final long SCHEDULER_APP_ID_INCREMENT = 111111111; + BlockingQueue eventQueue = new LinkedBlockingQueue(); + // Not tracking container / task to schedulerId. Instead relying on everything flowing through + // the system and being propagated back via events. + /** * * @param appContext @@ -127,7 +140,7 @@ public class TaskSchedulerEventHandler extends AbstractService public TaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, - String [] schedulerClasses) { + String [] schedulerClasses, boolean isPureLocalMode) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; this.eventHandler = eventHandler; @@ -135,13 +148,39 @@ public class TaskSchedulerEventHandler extends AbstractService this.containerSignatureMatcher = containerSignatureMatcher; this.webUI = webUI; this.historyUrl = getHistoryUrl(); + this.isPureLocalMode = isPureLocalMode; if (this.webUI != null) { this.webUI.setHistoryUrl(this.historyUrl); } - if (schedulerClasses == null || schedulerClasses.length == 0) { - this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + + // Override everything for pure local mode + if (isPureLocalMode) { + this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + this.yarnTaskSchedulerIndex = -1; } else { - this.taskSchedulerClasses = schedulerClasses; + if (schedulerClasses == null || schedulerClasses.length ==0) { + this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + this.yarnTaskSchedulerIndex = 0; + } else { + // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration. + int foundYarnTaskSchedulerIndex = -1; + for (int i = 0 ; i < schedulerClasses.length ; i++) { + if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + foundYarnTaskSchedulerIndex = i; + break; + } + } + if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end. + this.taskSchedulerClasses = new String[schedulerClasses.length+1]; + foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1; + for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest. + this.taskSchedulerClasses[i] = schedulerClasses[i]; + } + } else { + this.taskSchedulerClasses = schedulerClasses; + } + this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex; + } } taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length]; } @@ -159,12 +198,12 @@ public class TaskSchedulerEventHandler extends AbstractService return cachedNodeCount; } - public Resource getAvailableResources() { - return taskSchedulers[0].getAvailableResources(); + public Resource getAvailableResources(int schedulerId) { + return taskSchedulers[schedulerId].getAvailableResources(); } - public Resource getTotalResources() { - return taskSchedulers[0].getTotalResources(); + public Resource getTotalResources(int schedulerId) { + return taskSchedulers[schedulerId].getTotalResources(); } public synchronized void handleEvent(AMSchedulerEvent sEvent) { @@ -178,7 +217,7 @@ public class TaskSchedulerEventHandler extends AbstractService switch(event.getState()) { case FAILED: case KILLED: - handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent); + handleTAUnsuccessfulEnd(event); break; case SUCCEEDED: handleTASucceeded(event); @@ -230,9 +269,9 @@ public class TaskSchedulerEventHandler extends AbstractService private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) { if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) { - taskSchedulers[0].blacklistNode(event.getNodeId()); + taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId()); } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) { - taskSchedulers[0].unblacklistNode(event.getNodeId()); + taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId()); } else { throw new TezUncheckedException("Invalid event type: " + event.getType()); } @@ -244,14 +283,14 @@ public class TaskSchedulerEventHandler extends AbstractService // TODO what happens to the task that was connected to this container? // current assumption is that it will eventually call handleTaStopRequest //TaskAttempt taskAttempt = (TaskAttempt) - taskSchedulers[0].deallocateContainer(containerId); + taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId); // TODO does this container need to be stopped via C_STOP_REQUEST sendEvent(new AMContainerEventStopRequest(containerId)); } private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) { TaskAttempt attempt = event.getAttempt(); - boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false); + boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false); // use stored value of container id in case the scheduler has removed this // assignment because the task has been deallocated earlier. // retroactive case @@ -293,7 +332,8 @@ public class TaskSchedulerEventHandler extends AbstractService event.getAttemptID())); } - boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true); + boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, + true); if (!wasContainerAllocated) { LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"); @@ -318,7 +358,7 @@ public class TaskSchedulerEventHandler extends AbstractService TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt(); if (affinityAttempt != null) { Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID()); - taskSchedulers[0].allocateTask(taskAttempt, + taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, event.getCapability(), affinityAttempt.getAssignedContainerID(), Priority.newInstance(event.getPriority()), @@ -338,7 +378,7 @@ public class TaskSchedulerEventHandler extends AbstractService } } - taskSchedulers[0].allocateTask(taskAttempt, + taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, event.getCapability(), hosts, racks, @@ -349,7 +389,8 @@ public class TaskSchedulerEventHandler extends AbstractService private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, - String schedulerClassName) { + String schedulerClassName, + long customAppIdIdentifier) { if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, @@ -357,7 +398,7 @@ public class TaskSchedulerEventHandler extends AbstractService } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: Local TaskScheduler"); return new LocalTaskSchedulerService(this, this.containerSignatureMatcher, - host, port, trackingUrl, appContext); + host, port, trackingUrl, customAppIdIdentifier, appContext); } else { LOG.info("Creating custom TaskScheduler: " + schedulerClassName); // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface. @@ -366,9 +407,10 @@ public class TaskSchedulerEventHandler extends AbstractService try { Constructor ctor = taskSchedulerClazz .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, - int.class, String.class, Configuration.class); + int.class, String.class, long.class, Configuration.class); ctor.setAccessible(true); - return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig()); + return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier, + getConfig()); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -383,10 +425,19 @@ public class TaskSchedulerEventHandler extends AbstractService @VisibleForTesting protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { + // TODO Add error checking for components being used in the Vertex when running in pure local mode. // Iterate over the list and create all the taskSchedulers + int j = 0; for (int i = 0; i < taskSchedulerClasses.length; i++) { + long customAppIdIdentifier; + if (isPureLocalMode || taskSchedulerClasses[i].equals( + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId. + customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp(); + } else { + customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); + } taskSchedulers[i] = createTaskScheduler(host, port, - trackingUrl, appContext, taskSchedulerClasses[i]); + trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier); } } @@ -405,12 +456,12 @@ public class TaskSchedulerEventHandler extends AbstractService for (int i = 0 ; i < taskSchedulers.length ; i++) { taskSchedulers[i].init(getConfig()); taskSchedulers[i].start(); - } - - // TODO TEZ-2118 Start using multiple task schedulers - if (shouldUnregisterFlag.get()) { - // Flag may have been set earlier when task scheduler was not initialized - taskSchedulers[0].setShouldUnregister(); + if (shouldUnregisterFlag.get()) { + // Flag may have been set earlier when task scheduler was not initialized + // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ? + // External services could need to talk to some other entity. + taskSchedulers[i].setShouldUnregister(); + } } this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") { @@ -459,8 +510,10 @@ public class TaskSchedulerEventHandler extends AbstractService if (eventHandlingThread != null) eventHandlingThread.interrupt(); } - if (taskSchedulers[0] != null) { - ((AbstractService)taskSchedulers[0]).stop(); + for (int i = 0 ; i < taskSchedulers.length ; i++) { + if (taskSchedulers[i] != null) { + taskSchedulers[i].stop(); + } } } @@ -469,15 +522,18 @@ public class TaskSchedulerEventHandler extends AbstractService public synchronized void taskAllocated(Object task, Object appCookie, Container container) { + AMSchedulerEventTALaunchRequest event = + (AMSchedulerEventTALaunchRequest) appCookie; ContainerId containerId = container.getId(); - if (appContext.getAllContainers().addContainerIfNew(container)) { + if (appContext.getAllContainers() + .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(), + event.getTaskCommId())) { appContext.getNodeTracker().nodeSeen(container.getNodeId()); sendEvent(new AMNodeEventContainerAllocated(container .getNodeId(), container.getId())); } - AMSchedulerEventTALaunchRequest event = - (AMSchedulerEventTALaunchRequest) appCookie; + TaskAttempt taskAttempt = event.getTaskAttempt(); // TODO - perhaps check if the task still needs this container // because the deallocateTask downcall may have raced with the @@ -486,7 +542,7 @@ public class TaskSchedulerEventHandler extends AbstractService if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) { sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), - event.getContainerContext())); + event.getContainerContext(), event.getLauncherId(), event.getTaskCommId())); } sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container)); sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(), @@ -605,6 +661,9 @@ public class TaskSchedulerEventHandler extends AbstractService public float getProgress() { // at this point allocate has been called and so node count must be available // may change after YARN-1722 + // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and + // node updates from the cluster. + // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in. int nodeCount = taskSchedulers[0].getClusterNodeCount(); if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; @@ -620,12 +679,17 @@ public class TaskSchedulerEventHandler extends AbstractService } public void dagCompleted() { - taskSchedulers[0].resetMatchLocalityForAllHeldContainers(); + for (int i = 0 ; i < taskSchedulers.length ; i++) { + taskSchedulers[i].resetMatchLocalityForAllHeldContainers(); + } } @Override public void preemptContainer(ContainerId containerId) { - taskSchedulers[0].deallocateContainer(containerId); + // TODO Why is this making a call back into the scheduler, when the call is originating from there. + // An AMContainer instance should already exist if an attempt is being made to preempt it + AMContainer amContainer = appContext.getAllContainers().get(containerId); + taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); // Inform the Containers about completion. sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION)); @@ -634,13 +698,24 @@ public class TaskSchedulerEventHandler extends AbstractService public void setShouldUnregisterFlag() { LOG.info("TaskScheduler notified that it should unregister from RM"); this.shouldUnregisterFlag.set(true); - if (this.taskSchedulers[0] != null) { - this.taskSchedulers[0].setShouldUnregister(); + for (int i = 0 ; i < taskSchedulers.length ; i++) { + if (this.taskSchedulers[i] != null) { + // TODO TEZ-2003 registration required for all schedulers ? + this.taskSchedulers[i].setShouldUnregister(); + } } } public boolean hasUnregistered() { - return this.taskSchedulers[0].hasUnregistered(); + boolean result = true; + for (int i = 0 ; i < taskSchedulers.length ; i++) { + // TODO TEZ-2003 registration required for all schedulers ? + result |= this.taskSchedulers[i].hasUnregistered(); + if (result == false) { + return result; + } + } + return result; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java index 0fc2e12..6616896 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java @@ -34,4 +34,7 @@ public interface AMContainer extends EventHandler{ public List getAllTaskAttempts(); public TezTaskAttemptID getCurrentTaskAttempt(); + public int getTaskSchedulerIdentifier(); + public int getContainerLauncherIdentifier(); + public int getTaskCommunicatorIdentifier(); } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java index d973264..92e5817 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java @@ -27,12 +27,17 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent { private final TezVertexID vertexId; private final ContainerContext containerContext; + private final int launcherId; + private final int taskCommId; public AMContainerEventLaunchRequest(ContainerId containerId, - TezVertexID vertexId, ContainerContext containerContext) { + TezVertexID vertexId, ContainerContext containerContext, + int launcherId, int taskCommId) { super(containerId, AMContainerEventType.C_LAUNCH_REQUEST); this.vertexId = vertexId; this.containerContext = containerContext; + this.launcherId = launcherId; + this.taskCommId = taskCommId; } public TezDAGID getDAGId() { @@ -46,4 +51,12 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent { public ContainerContext getContainerContext() { return this.containerContext; } + + public int getLauncherId() { + return launcherId; + } + + public int getTaskCommId() { + return taskCommId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 1acec9c..39df2e8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -81,6 +81,9 @@ public class AMContainerImpl implements AMContainer { private final TaskAttemptListener taskAttemptListener; protected final EventHandler eventHandler; private final ContainerSignatureMatcher signatureMatcher; + private final int schedulerId; + private final int launcherId; + private final int taskCommId; private final List completedAttempts = new LinkedList(); @@ -302,7 +305,7 @@ public class AMContainerImpl implements AMContainer { // additional change - JvmID, YarnChild, etc depend on TaskType. public AMContainerImpl(Container container, ContainerHeartbeatHandler chh, TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher, - AppContext appContext) { + AppContext appContext, int schedulerId, int launcherId, int taskCommId) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -314,6 +317,9 @@ public class AMContainerImpl implements AMContainer { this.containerHeartbeatHandler = chh; this.taskAttemptListener = tal; this.failedAssignments = new LinkedList(); + this.schedulerId = schedulerId; + this.launcherId = launcherId; + this.taskCommId = taskCommId; this.stateMachine = stateMachineFactory.make(this); } @@ -363,6 +369,21 @@ public class AMContainerImpl implements AMContainer { } } + @Override + public int getTaskSchedulerIdentifier() { + return this.schedulerId; + } + + @Override + public int getContainerLauncherIdentifier() { + return this.launcherId; + } + + @Override + public int getTaskCommunicatorIdentifier() { + return this.taskCommId; + } + public boolean isInErrorState() { return inError; } @@ -432,7 +453,7 @@ public class AMContainerImpl implements AMContainer { containerContext.getLocalResources(), containerContext.getEnvironment(), containerContext.getJavaOpts(), - container.taskAttemptListener.getAddress(), containerContext.getCredentials(), + container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(), container.appContext, container.container.getResource(), container.appContext.getAMConf()); @@ -1014,7 +1035,7 @@ public class AMContainerImpl implements AMContainer { } protected void deAllocate() { - sendEvent(new AMSchedulerEventDeallocateContainer(containerId)); + sendEvent(new AMSchedulerEventDeallocateContainer(containerId, schedulerId)); } protected void sendTerminatedToTaskAttempt( @@ -1044,28 +1065,28 @@ public class AMContainerImpl implements AMContainer { } protected void sendStartRequestToNM(ContainerLaunchContext clc) { - sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container)); + sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId)); } protected void sendStopRequestToNM() { sendEvent(new NMCommunicatorStopRequestEvent(containerId, - container.getNodeId(), container.getContainerToken())); + container.getNodeId(), container.getContainerToken(), launcherId)); } protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) { - taskAttemptListener.unregisterTaskAttempt(attemptId); + taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId); } protected void registerAttemptWithListener(AMContainerTask amContainerTask) { - taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId); + taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId); } protected void registerWithTAListener() { - taskAttemptListener.registerRunningContainer(containerId); + taskAttemptListener.registerRunningContainer(containerId, taskCommId); } protected void unregisterFromTAListener() { - this.taskAttemptListener.unregisterRunningContainer(containerId); + this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId); } protected void registerWithContainerListener() { http://git-wip-us.apache.org/repos/asf/tez/blob/45756504/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index 6037a3a..d67de3d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@ -61,9 +61,9 @@ public class AMContainerMap extends AbstractService implements EventHandler