Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 67CF418E1F for ; Sat, 22 Aug 2015 01:18:52 +0000 (UTC) Received: (qmail 10398 invoked by uid 500); 22 Aug 2015 01:18:52 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 10293 invoked by uid 500); 22 Aug 2015 01:18:52 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 8851 invoked by uid 99); 22 Aug 2015 01:18:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 01:18:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 863B1E0415; Sat, 22 Aug 2015 01:18:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 01:19:13 -0000 Message-Id: <49f0f6be880a40c580efabbbb4b9ff4b@git.apache.org> In-Reply-To: <874e92d7e5584fc6929ed08fa7b87588@git.apache.org> References: <874e92d7e5584fc6929ed08fa7b87588@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] tez git commit: TEZ-2124. Change Node tracking to work per external container source. (sseth) TEZ-2124. Change Node tracking to work per external container source. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3a143a67 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3a143a67 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3a143a67 Branch: refs/heads/TEZ-2003 Commit: 3a143a67187b29f792c43b9acc40feec0dc16697 Parents: 9e94757 Author: Siddharth Seth Authored: Thu Jul 16 14:18:35 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:13:55 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 3 +- .../dag/app/launcher/ContainerLauncherImpl.java | 5 +- .../app/rm/TaskSchedulerAppCallbackImpl.java | 89 +++++++++ .../dag/app/rm/TaskSchedulerEventHandler.java | 71 +++---- .../apache/tez/dag/app/rm/node/AMNodeEvent.java | 8 +- .../rm/node/AMNodeEventContainerAllocated.java | 4 +- .../rm/node/AMNodeEventNodeCountUpdated.java | 4 +- .../app/rm/node/AMNodeEventStateChanged.java | 4 +- .../rm/node/AMNodeEventTaskAttemptEnded.java | 4 +- .../node/AMNodeEventTaskAttemptSucceeded.java | 4 +- .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 6 +- .../tez/dag/app/rm/node/AMNodeTracker.java | 162 +++++----------- .../dag/app/rm/node/PerSourceNodeTracker.java | 187 +++++++++++++++++++ .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../tez/dag/app/TestMockDAGAppMaster.java | 2 +- .../tez/dag/app/rm/TestContainerReuse.java | 62 +++--- .../app/rm/TestTaskSchedulerEventHandler.java | 11 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 64 ++++--- 21 files changed, 462 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 590fe7f..604947c 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -33,5 +33,6 @@ ALL CHANGES: TEZ-2508. rebase 06/01 TEZ-2526. Fix version for tez-history-parser. TEZ-2621. rebase 07/14 + TEZ-2124. Change Node tracking to work per external container source. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index e37fc2f..ec2ef66 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1440,9 +1440,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, VertexImpl v = createVertex(this, vertexName, i); addVertex(v); } + // check task resources, only check it in non-local mode if (!appContext.isLocal()) { for (Vertex v : vertexMap.values()) { + // TODO TEZ-2003 (post) Ideally, this should be per source. if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) { String msg = "Vertex's TaskResource is beyond the cluster container capability," + "Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource() http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 93b4c3f..1b55295 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1396,7 +1396,8 @@ public class TaskImpl implements Task, EventHandler { if (amContainer != null) { // inform the node about failure task.eventHandler.handle( - new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(), + new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(), + task.getVertex().getTaskSchedulerIdentifier(), containerId, failedAttemptId, true)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index a1eb2a7..a12fb04 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.api.TezConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -292,7 +293,9 @@ public class ContainerLauncherImpl extends AbstractService implements // nodes where containers will run at *this* point of time. This is // *not* the cluster size and doesn't need to be. - int numNodes = context.getNodeTracker().getNumNodes(); + int yarnSourceIndex = + context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex); int idealPoolSize = Math.min(limitOnPoolSize, numNodes); if (poolSize < idealPoolSize) { http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java new file mode 100644 index 0000000..ea37e94 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; + +public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{ + + private final TaskSchedulerEventHandler tseh; + private final int schedulerId; + + public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) { + this.tseh = tseh; + this.schedulerId = schedulerId; + } + + @Override + public void taskAllocated(Object task, Object appCookie, Container container) { + tseh.taskAllocated(schedulerId, task, appCookie, container); + } + + @Override + public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { + tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus); + } + + @Override + public void containerBeingReleased(ContainerId containerId) { + tseh.containerBeingReleased(schedulerId, containerId); + } + + @Override + public void nodesUpdated(List updatedNodes) { + tseh.nodesUpdated(schedulerId, updatedNodes); + } + + @Override + public void appShutdownRequested() { + tseh.appShutdownRequested(schedulerId); + } + + @Override + public void setApplicationRegistrationData(Resource maxContainerCapability, + Map appAcls, + ByteBuffer clientAMSecretKey) { + tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey); + } + + @Override + public void onError(Throwable t) { + tseh.onError(schedulerId, t); + } + + @Override + public float getProgress() { + return tseh.getProgress(schedulerId); + } + + @Override + public void preemptContainer(ContainerId containerId) { + tseh.preemptContainer(schedulerId, containerId); + } + + @Override + public AppFinalStatus getFinalAppStatus() { + return tseh.getFinalAppStatus(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 4d1b43a..549db14 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -81,8 +82,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import com.google.common.base.Preconditions; -public class TaskSchedulerEventHandler extends AbstractService - implements TaskSchedulerAppCallback, +public class TaskSchedulerEventHandler extends AbstractService implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerEventHandler.class); @@ -315,7 +315,7 @@ public class TaskSchedulerEventHandler extends AbstractService // stopped. // AMNodeImpl blacklisting logic does not account for KILLED attempts. sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers(). - get(attemptContainerId).getContainer().getNodeId(), attemptContainerId, + get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId, attempt.getID(), event.getState() == TaskAttemptState.FAILED)); } } @@ -330,7 +330,7 @@ public class TaskSchedulerEventHandler extends AbstractService sendEvent(new AMContainerEventTASucceeded(usedContainerId, event.getAttemptID())); sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers(). - get(usedContainerId).getContainer().getNodeId(), usedContainerId, + get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId, event.getAttemptID())); } @@ -392,14 +392,16 @@ public class TaskSchedulerEventHandler extends AbstractService private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, String schedulerClassName, - long customAppIdIdentifier) { + long customAppIdIdentifier, + int schedulerId) { + TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId); if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); - return new YarnTaskSchedulerService(this, this.containerSignatureMatcher, + return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher, host, port, trackingUrl, appContext); } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: Local TaskScheduler"); - return new LocalTaskSchedulerService(this, this.containerSignatureMatcher, + return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher, host, port, trackingUrl, customAppIdIdentifier, appContext); } else { LOG.info("Creating custom TaskScheduler: " + schedulerClassName); @@ -411,7 +413,7 @@ public class TaskSchedulerEventHandler extends AbstractService .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, int.class, String.class, long.class, Configuration.class); ctor.setAccessible(true); - return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier, + return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier, getConfig()); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); @@ -441,7 +443,7 @@ public class TaskSchedulerEventHandler extends AbstractService LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" + customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, - trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier); + trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i); } } @@ -525,20 +527,21 @@ public class TaskSchedulerEventHandler extends AbstractService } } - // TaskSchedulerAppCallback methods - @Override - public synchronized void taskAllocated(Object task, + // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context + + // TaskSchedulerAppCallback methods with schedulerId, where relevant + public synchronized void taskAllocated(int schedulerId, Object task, Object appCookie, Container container) { AMSchedulerEventTALaunchRequest event = (AMSchedulerEventTALaunchRequest) appCookie; ContainerId containerId = container.getId(); if (appContext.getAllContainers() - .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(), + .addContainerIfNew(container, schedulerId, event.getLauncherId(), event.getTaskCommId())) { - appContext.getNodeTracker().nodeSeen(container.getNodeId()); + appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId); sendEvent(new AMNodeEventContainerAllocated(container - .getNodeId(), container.getId())); + .getNodeId(), schedulerId, container.getId())); } @@ -558,8 +561,8 @@ public class TaskSchedulerEventHandler extends AbstractService .getContainerContext().getCredentials(), event.getPriority())); } - @Override - public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) { + public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) { + // SchedulerId isn't used here since no node updates are sent out // Inform the Containers about completion. AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId()); if (amContainer != null) { @@ -582,8 +585,8 @@ public class TaskSchedulerEventHandler extends AbstractService } } - @Override - public synchronized void containerBeingReleased(ContainerId containerId) { + public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) { + // SchedulerId isn't used here since no node updates are sent out AMContainer amContainer = appContext.getAllContainers().get(containerId); if (amContainer != null) { sendEvent(new AMContainerEventStopRequest(containerId)); @@ -591,28 +594,27 @@ public class TaskSchedulerEventHandler extends AbstractService } @SuppressWarnings("unchecked") - @Override - public synchronized void nodesUpdated(List updatedNodes) { + public synchronized void nodesUpdated(int schedulerId, List updatedNodes) { for (NodeReport nr : updatedNodes) { // Scheduler will find out from the node, if at all. // Relying on the RM to not allocate containers on an unhealthy node. - eventHandler.handle(new AMNodeEventStateChanged(nr)); + eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId)); } } - @Override - public synchronized void appShutdownRequested() { + public synchronized void appShutdownRequested(int schedulerId) { // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. - LOG.info("App shutdown requested by scheduler"); + LOG.info("App shutdown requested by scheduler {}", schedulerId); sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT)); } - @Override public synchronized void setApplicationRegistrationData( + int schedulerId, Resource maxContainerCapability, Map appAcls, ByteBuffer clientAMSecretKey) { + // TODO TEZ-2003 (post) Ideally clusterInfo should be available per source rather than a global view. this.appContext.getClusterInfo().setMaxContainerCapability( maxContainerCapability); this.appAcls = appAcls; @@ -623,7 +625,6 @@ public class TaskSchedulerEventHandler extends AbstractService // TaskScheduler uses a separate thread for it's callbacks. Since this method // returns a value which is required, the TaskScheduler wait for the call to // complete and can hence lead to a deadlock if called from within a TSEH lock. - @Override public AppFinalStatus getFinalAppStatus() { FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; StringBuffer sb = new StringBuffer(); @@ -665,24 +666,25 @@ public class TaskSchedulerEventHandler extends AbstractService // TaskScheduler uses a separate thread for it's callbacks. Since this method // returns a value which is required, the TaskScheduler wait for the call to // complete and can hence lead to a deadlock if called from within a TSEH lock. - @Override - public float getProgress() { + public float getProgress(int schedulerId) { // at this point allocate has been called and so node count must be available // may change after YARN-1722 // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and // node updates from the cluster. + + // Doubles as a mechanism to update node counts periodically. Hence schedulerId required. + // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in. int nodeCount = taskSchedulers[0].getClusterNodeCount(); if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; - sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount)); + sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId)); } return dagAppMaster.getProgress(); } - @Override - public void onError(Throwable t) { - LOG.info("Error reported by scheduler", t); + public void onError(int schedulerId, Throwable t) { + LOG.info("Error reported by scheduler {} - {}", schedulerId, t); sendEvent(new DAGAppMasterEventSchedulingServiceError(t)); } @@ -697,8 +699,7 @@ public class TaskSchedulerEventHandler extends AbstractService // the context has updated information. } - @Override - public void preemptContainer(ContainerId containerId) { + public void preemptContainer(int schedulerId, ContainerId containerId) { // TODO Why is this making a call back into the scheduler, when the call is originating from there. // An AMContainer instance should already exist if an attempt is being made to preempt it AMContainer amContainer = appContext.getAllContainers().get(containerId); http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java index a623cda..85bc513 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java @@ -24,13 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent; public class AMNodeEvent extends AbstractEvent { private final NodeId nodeId; + private final int sourceId; // Effectively the schedulerId - public AMNodeEvent(NodeId nodeId, AMNodeEventType type) { + public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) { super(type); this.nodeId = nodeId; + this.sourceId = sourceId; } public NodeId getNodeId() { return this.nodeId; } + + public int getSourceId() { + return sourceId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java index 0770969..e250f42 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerAllocated.java @@ -24,8 +24,8 @@ public class AMNodeEventContainerAllocated extends AMNodeEvent { private final ContainerId containerId; - public AMNodeEventContainerAllocated(NodeId nodeId, ContainerId containerId) { - super(nodeId, AMNodeEventType.N_CONTAINER_ALLOCATED); + public AMNodeEventContainerAllocated(NodeId nodeId, int sourceId, ContainerId containerId) { + super(nodeId, sourceId, AMNodeEventType.N_CONTAINER_ALLOCATED); this.containerId = containerId; } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java index 86ca1fc..3b35daf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventNodeCountUpdated.java @@ -22,8 +22,8 @@ public class AMNodeEventNodeCountUpdated extends AMNodeEvent { private final int count; - public AMNodeEventNodeCountUpdated(int nodeCount) { - super(null, AMNodeEventType.N_NODE_COUNT_UPDATED); + public AMNodeEventNodeCountUpdated(int nodeCount, int sourceId) { + super(null, sourceId, AMNodeEventType.N_NODE_COUNT_UPDATED); this.count = nodeCount; } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java index ca4e5bd..b371ddd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventStateChanged.java @@ -23,8 +23,8 @@ public class AMNodeEventStateChanged extends AMNodeEvent { private NodeReport nodeReport; - public AMNodeEventStateChanged(NodeReport nodeReport) { - super(nodeReport.getNodeId(), + public AMNodeEventStateChanged(NodeReport nodeReport, int sourceId) { + super(nodeReport.getNodeId(), sourceId, (nodeReport.getNodeState().isUnusable() ? AMNodeEventType.N_TURNED_UNHEALTHY : AMNodeEventType.N_TURNED_HEALTHY)); http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java index c823236..4a4cb61 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptEnded.java @@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptEnded extends AMNodeEvent { private final ContainerId containerId; private final TezTaskAttemptID taskAttemptId; - public AMNodeEventTaskAttemptEnded(NodeId nodeId, ContainerId containerId, + public AMNodeEventTaskAttemptEnded(NodeId nodeId, int sourceId, ContainerId containerId, TezTaskAttemptID taskAttemptId, boolean failed) { - super(nodeId, AMNodeEventType.N_TA_ENDED); + super(nodeId, sourceId, AMNodeEventType.N_TA_ENDED); this.failed = failed; this.containerId = containerId; this.taskAttemptId = taskAttemptId; http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java index b07d594..2b8cb7d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventTaskAttemptSucceeded.java @@ -27,9 +27,9 @@ public class AMNodeEventTaskAttemptSucceeded extends AMNodeEvent { private final ContainerId containerId; private final TezTaskAttemptID taskAttemptId; - public AMNodeEventTaskAttemptSucceeded(NodeId nodeId, + public AMNodeEventTaskAttemptSucceeded(NodeId nodeId, int sourceId, ContainerId containerId, TezTaskAttemptID taskAttemptId) { - super(nodeId, AMNodeEventType.N_TA_SUCCEEDED); + super(nodeId, sourceId, AMNodeEventType.N_TA_SUCCEEDED); this.containerId = containerId; this.taskAttemptId = taskAttemptId; } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index 0d8e4cd..88b36cb1f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -54,6 +54,7 @@ public class AMNodeImpl implements AMNode { private final ReadLock readLock; private final WriteLock writeLock; private final NodeId nodeId; + private final int sourceId; private final AppContext appContext; private final int maxTaskFailuresPerNode; private boolean blacklistingEnabled; @@ -172,13 +173,14 @@ public class AMNodeImpl implements AMNode { @SuppressWarnings("rawtypes") - public AMNodeImpl(NodeId nodeId, int maxTaskFailuresPerNode, + public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode, EventHandler eventHandler, boolean blacklistingEnabled, AppContext appContext) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); this.nodeId = nodeId; + this.sourceId = sourceId; this.appContext = appContext; this.eventHandler = eventHandler; this.blacklistingEnabled = blacklistingEnabled; @@ -247,7 +249,7 @@ public class AMNodeImpl implements AMNode { /* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */ protected boolean registerBadNodeAndShouldBlacklist() { - return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this); + return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId); } protected void blacklistSelf() { http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 102cbe9..0668ff2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -18,9 +18,8 @@ package org.apache.tez.dag.app.rm.node; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; @@ -29,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -42,23 +40,21 @@ public class AMNodeTracker extends AbstractService implements static final Logger LOG = LoggerFactory.getLogger(AMNodeTracker.class); - private final ConcurrentHashMap nodeMap; - private final ConcurrentHashMap> blacklistMap; + private final ConcurrentMap perSourceNodeTrackers; + @SuppressWarnings("rawtypes") private final EventHandler eventHandler; private final AppContext appContext; - private int numClusterNodes; - private boolean ignoreBlacklisting = false; + + // Not final since it's setup in serviceInit private int maxTaskFailuresPerNode; private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; - float currentIgnoreBlacklistingCountThreshold = 0; - + @SuppressWarnings("rawtypes") public AMNodeTracker(EventHandler eventHandler, AppContext appContext) { super("AMNodeMap"); - this.nodeMap = new ConcurrentHashMap(); - this.blacklistMap = new ConcurrentHashMap>(); + this.perSourceNodeTrackers = new ConcurrentHashMap<>(); this.eventHandler = eventHandler; this.appContext = appContext; } @@ -76,7 +72,7 @@ public class AMNodeTracker extends AbstractService implements TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT); LOG.info("blacklistDisablePercent is " + blacklistDisablePercent + - ", blacklistingEnabled: " + nodeBlacklistingEnabled + + ", blacklistingEnabled: " + nodeBlacklistingEnabled + ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode); if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { @@ -85,130 +81,66 @@ public class AMNodeTracker extends AbstractService implements + ". Should be an integer between 0 and 100 or -1 to disabled"); } } - - public void nodeSeen(NodeId nodeId) { - if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode, - eventHandler, nodeBlacklistingEnabled, appContext)) == null) { - LOG.info("Adding new node: " + nodeId); - } - } - private void addToBlackList(NodeId nodeId) { - String host = nodeId.getHost(); - - if (!blacklistMap.containsKey(host)) { - blacklistMap.putIfAbsent(host, new HashSet()); - } - Set nodes = blacklistMap.get(host); - - if (!nodes.contains(nodeId)) { - nodes.add(nodeId); - } + public void nodeSeen(NodeId nodeId, int sourceId) { + PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId); + nodeTracker.nodeSeen(nodeId); } - boolean registerBadNodeAndShouldBlacklist(AMNode amNode) { - if (nodeBlacklistingEnabled) { - addToBlackList(amNode.getNodeId()); - computeIgnoreBlacklisting(); - return !ignoreBlacklisting; - } else { - return false; - } + + boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) { + return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode); } public void handle(AMNodeEvent rEvent) { // No synchronization required until there's multiple dispatchers. - NodeId nodeId = rEvent.getNodeId(); switch (rEvent.getType()) { - case N_NODE_COUNT_UPDATED: - AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent; - numClusterNodes = event.getNodeCount(); - LOG.info("Num cluster nodes = " + numClusterNodes); - recomputeCurrentIgnoreBlacklistingThreshold(); - computeIgnoreBlacklisting(); - break; - case N_TURNED_UNHEALTHY: - case N_TURNED_HEALTHY: - AMNode amNode = nodeMap.get(nodeId); - if (amNode == null) { - LOG.info("Ignoring RM Health Update for unknown node: " + nodeId); - } else { - amNode.handle(rEvent); - } - break; - default: - nodeMap.get(nodeId).handle(rEvent); + case N_CONTAINER_ALLOCATED: + case N_TA_SUCCEEDED: + case N_TA_ENDED: + case N_IGNORE_BLACKLISTING_ENABLED: + case N_IGNORE_BLACKLISTING_DISABLED: + // All of these will only be seen after a node has been registered. + perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent); + break; + case N_TURNED_UNHEALTHY: + case N_TURNED_HEALTHY: + case N_NODE_COUNT_UPDATED: + // These events can be seen without a node having been marked as 'seen' before + getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent); + break; } } - private void recomputeCurrentIgnoreBlacklistingThreshold() { - if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) { - currentIgnoreBlacklistingCountThreshold = - (float) numClusterNodes * blacklistDisablePercent / 100; - } + public AMNode get(NodeId nodeId, int sourceId) { + return perSourceNodeTrackers.get(sourceId).get(nodeId); } - // May be incorrect if there's multiple NodeManagers running on a single host. - // knownNodeCount is based on node managers, not hosts. blacklisting is - // currently based on hosts. - protected void computeIgnoreBlacklisting() { - - boolean stateChanged = false; - - if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) { - return; - } - if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) { - if (ignoreBlacklisting == false) { - ignoreBlacklisting = true; - LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes - + ", Blacklisted: " + blacklistMap.size()); - stateChanged = true; - } - } else { - if (ignoreBlacklisting == true) { - ignoreBlacklisting = false; - LOG.info("Ignore blacklisting set to false. Known: " - + numClusterNodes + ", Blacklisted: " + blacklistMap.size()); - stateChanged = true; - } - } - - if (stateChanged) { - sendIngoreBlacklistingStateToNodes(); - } - } - - private void sendIngoreBlacklistingStateToNodes() { - AMNodeEventType eventType = - ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED - : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED; - for (NodeId nodeId : nodeMap.keySet()) { - sendEvent(new AMNodeEvent(nodeId, eventType)); - } - } - - public AMNode get(NodeId nodeId) { - return nodeMap.get(nodeId); - } - - @SuppressWarnings("unchecked") - private void sendEvent(Event event) { - this.eventHandler.handle(event); - } - - public int getNumNodes() { - return nodeMap.size(); + public int getNumNodes(int sourceId) { + return perSourceNodeTrackers.get(sourceId).getNumNodes(); } @Private @VisibleForTesting - public boolean isBlacklistingIgnored() { - return this.ignoreBlacklisting; + public boolean isBlacklistingIgnored(int sourceId) { + return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored(); } public void dagComplete(DAG dag) { // TODO TEZ-2337 Maybe reset failures from previous DAGs } + private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) { + PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId); + if (nodeTracker == null) { + nodeTracker = + new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode, + nodeBlacklistingEnabled, blacklistDisablePercent); + PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker); + nodeTracker = old != null ? old : nodeTracker; + } + return nodeTracker; + } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java new file mode 100644 index 0000000..3264708 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm.node; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.app.AppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerSourceNodeTracker { + + static final Logger LOG = LoggerFactory.getLogger(PerSourceNodeTracker.class); + + private final int sourceId; + private final ConcurrentHashMap nodeMap; + private final ConcurrentHashMap> blacklistMap; + + @SuppressWarnings("rawtypes") + private final EventHandler eventHandler; + private final AppContext appContext; + + private final int maxTaskFailuresPerNode; + private final boolean nodeBlacklistingEnabled; + private final int blacklistDisablePercent; + + private int numClusterNodes; + float currentIgnoreBlacklistingCountThreshold = 0; + private boolean ignoreBlacklisting = false; + + @SuppressWarnings("rawtypes") + public PerSourceNodeTracker(int sourceId, EventHandler eventHandler, AppContext appContext, + int maxTaskFailuresPerNode, boolean nodeBlacklistingEnabled, + int blacklistDisablePercent) { + this.sourceId = sourceId; + this.nodeMap = new ConcurrentHashMap<>(); + this.blacklistMap = new ConcurrentHashMap<>(); + this.eventHandler = eventHandler; + this.appContext = appContext; + + this.maxTaskFailuresPerNode = maxTaskFailuresPerNode; + this.nodeBlacklistingEnabled = nodeBlacklistingEnabled; + this.blacklistDisablePercent = blacklistDisablePercent; + } + + + + public void nodeSeen(NodeId nodeId) { + if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, sourceId, maxTaskFailuresPerNode, + eventHandler, nodeBlacklistingEnabled, appContext)) == null) { + LOG.info("Adding new node {} to nodeTracker {}", nodeId, sourceId); + } + } + + public AMNode get(NodeId nodeId) { + return nodeMap.get(nodeId); + } + + public int getNumNodes() { + return nodeMap.size(); + } + + public void handle(AMNodeEvent rEvent) { + // No synchronization required until there's multiple dispatchers. + NodeId nodeId = rEvent.getNodeId(); + switch (rEvent.getType()) { + case N_NODE_COUNT_UPDATED: + AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent; + numClusterNodes = event.getNodeCount(); + LOG.info("Num cluster nodes = " + numClusterNodes); + recomputeCurrentIgnoreBlacklistingThreshold(); + computeIgnoreBlacklisting(); + break; + case N_TURNED_UNHEALTHY: + case N_TURNED_HEALTHY: + AMNode amNode = nodeMap.get(nodeId); + if (amNode == null) { + LOG.info("Ignoring RM Health Update for unknown node: " + nodeId); + } else { + amNode.handle(rEvent); + } + break; + default: + nodeMap.get(nodeId).handle(rEvent); + } + } + + boolean registerBadNodeAndShouldBlacklist(AMNode amNode) { + if (nodeBlacklistingEnabled) { + addToBlackList(amNode.getNodeId()); + computeIgnoreBlacklisting(); + return !ignoreBlacklisting; + } else { + return false; + } + } + + @InterfaceAudience.Private + @VisibleForTesting + public boolean isBlacklistingIgnored() { + return this.ignoreBlacklisting; + } + + private void recomputeCurrentIgnoreBlacklistingThreshold() { + if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) { + currentIgnoreBlacklistingCountThreshold = + (float) numClusterNodes * blacklistDisablePercent / 100; + } + } + + // May be incorrect if there's multiple NodeManagers running on a single host. + // knownNodeCount is based on node managers, not hosts. blacklisting is + // currently based on hosts. + protected void computeIgnoreBlacklisting() { + + boolean stateChanged = false; + + if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) { + return; + } + if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) { + if (ignoreBlacklisting == false) { + ignoreBlacklisting = true; + LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes + + ", Blacklisted: " + blacklistMap.size()); + stateChanged = true; + } + } else { + if (ignoreBlacklisting == true) { + ignoreBlacklisting = false; + LOG.info("Ignore blacklisting set to false. Known: " + + numClusterNodes + ", Blacklisted: " + blacklistMap.size()); + stateChanged = true; + } + } + + if (stateChanged) { + sendIngoreBlacklistingStateToNodes(); + } + } + + private void addToBlackList(NodeId nodeId) { + String host = nodeId.getHost(); + + if (!blacklistMap.containsKey(host)) { + blacklistMap.putIfAbsent(host, new HashSet()); + } + Set nodes = blacklistMap.get(host); + + if (!nodes.contains(nodeId)) { + nodes.add(nodeId); + } + } + + private void sendIngoreBlacklistingStateToNodes() { + AMNodeEventType eventType = + ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED + : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED; + for (NodeId nodeId : nodeMap.keySet()) { + sendEvent(new AMNodeEvent(nodeId, sourceId, eventType)); + } + } + + @SuppressWarnings("unchecked") + private void sendEvent(Event event) { + this.eventHandler.handle(event); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 9882954..0f35bba 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -257,7 +257,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } public void preemptContainer(ContainerData cData) { - getTaskSchedulerEventHandler().containerCompleted(null, + getTaskSchedulerEventHandler().containerCompleted(0, null, ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED)); cData.clear(); } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 42d4b0b..7584b4c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -200,7 +200,7 @@ public class TestMockDAGAppMaster { mockLauncher.waitTillContainersLaunched(); ContainerData cData = mockLauncher.getContainers().values().iterator().next(); DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); - mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId); + mockApp.getTaskSchedulerEventHandler().preemptContainer(0, cData.cId); mockLauncher.startScheduling(true); dagClient.waitForCompletion(); http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 080c20f..62edac9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -216,9 +216,9 @@ public class TestContainerReuse { TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta11), any(Object.class), eq(containerHost1)); + eq(0), eq(ta11), any(Object.class), eq(containerHost1)); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta21), any(Object.class), eq(containerHost2)); + eq(0), eq(ta21), any(Object.class), eq(containerHost2)); // Adding the event later so that task1 assigned to containerHost1 // is deterministic. @@ -230,7 +230,7 @@ public class TestContainerReuse { drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(1)).taskAllocated( - eq(ta31), any(Object.class), eq(containerHost1)); + eq(0), eq(ta31), any(Object.class), eq(containerHost1)); verify(rmClient, times(0)).releaseAssignedContainer( eq(containerHost1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -245,7 +245,7 @@ public class TestContainerReuse { while (System.currentTimeMillis() < currentTs + 5000l) { try { verify(taskSchedulerEventHandler, - times(1)).containerBeingReleased(eq(containerHost2.getId())); + times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId())); exception = null; break; } catch (Throwable e) { @@ -351,8 +351,8 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2)); // Adding the event later so that task1 assigned to containerHost1 is deterministic. taskSchedulerEventHandler.handleEvent(lrTa31); @@ -363,7 +363,7 @@ public class TestContainerReuse { drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( - eq(ta31), any(Object.class), eq(containerHost2)); + eq(0), eq(ta31), any(Object.class), eq(containerHost2)); verify(rmClient, times(1)).releaseAssignedContainer( eq(containerHost2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -459,13 +459,13 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); // Task assigned to container completed successfully. Container should be re-used. taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -475,7 +475,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -483,7 +483,7 @@ public class TestContainerReuse { // Verify no re-use if a previous task fails. taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0)); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1)); verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null)); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -496,7 +496,7 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); // Task assigned to container completed successfully. No pending requests. Container should be released. taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); @@ -606,14 +606,14 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); // First task had profiling on. This container can not be reused further. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class), + verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -652,14 +652,14 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), + verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -698,13 +698,13 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container3)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3)); //Ensure task 6 (of vertex 1) is allocated to same container taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); eventHandler.reset(); taskScheduler.close(); @@ -804,7 +804,7 @@ public class TestContainerReuse { TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta11), any(Object.class), eq(container1)); + eq(0), eq(ta11), any(Object.class), eq(container1)); // Send launch request for task2 (vertex2) taskSchedulerEventHandler.handleEvent(lrEvent12); @@ -818,7 +818,7 @@ public class TestContainerReuse { drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler, times(0)).taskAllocated( - eq(ta12), any(Object.class), eq(container1)); + eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -826,7 +826,7 @@ public class TestContainerReuse { LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(3000l); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta12), any(Object.class), eq(container1)); + eq(0), eq(ta12), any(Object.class), eq(container1)); // TA12 completed. taskSchedulerEventHandler.handleEvent( @@ -940,7 +940,7 @@ public class TestContainerReuse { TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta11), any(Object.class), eq(container1)); + eq(0), eq(ta11), any(Object.class), eq(container1)); // Send launch request for task2 (vertex2) taskSchedulerEventHandler.handleEvent(lrEvent21); @@ -953,7 +953,7 @@ public class TestContainerReuse { drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); verify(taskSchedulerEventHandler).taskAllocated( - eq(ta21), any(Object.class), eq(container1)); + eq(0), eq(ta21), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); // Task 2 completes. @@ -1063,7 +1063,7 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); @@ -1071,7 +1071,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1114,7 +1114,7 @@ public class TestContainerReuse { // TODO This is terrible, need a better way to ensure the scheduling loop has run LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(6000l); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1124,7 +1124,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1237,7 +1237,7 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); @@ -1245,7 +1245,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); @@ -1290,7 +1290,7 @@ public class TestContainerReuse { Thread.sleep(6000l); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); - verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); eventHandler.reset(); taskScheduler.close(); @@ -1369,7 +1369,7 @@ public class TestContainerReuse { drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta11), + verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); taskScheduler.close(); taskSchedulerEventHandler.close(); http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index daf1db6..005692e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -163,7 +163,7 @@ public class TestTaskSchedulerEventHandler { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); - schedulerHandler.taskAllocated(mockTaskAttempt, lr, container); + schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); assertEquals(2, mockEventHandler.events.size()); assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = @@ -227,7 +227,7 @@ public class TestTaskSchedulerEventHandler { when(mockStatus.getContainerId()).thenReturn(mockCId); when(mockStatus.getDiagnostics()).thenReturn(diagnostics); when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED); - schedulerHandler.containerCompleted(mockTask, mockStatus); + schedulerHandler.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -257,7 +257,7 @@ public class TestTaskSchedulerEventHandler { ContainerId mockCId = mock(ContainerId.class); verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any()); when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer); - schedulerHandler.preemptContainer(mockCId); + schedulerHandler.preemptContainer(0, mockCId); verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); @@ -290,7 +290,7 @@ public class TestTaskSchedulerEventHandler { when(mockStatus.getContainerId()).thenReturn(mockCId); when(mockStatus.getDiagnostics()).thenReturn(diagnostics); when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED); - schedulerHandler.containerCompleted(mockTask, mockStatus); + schedulerHandler.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -325,7 +325,7 @@ public class TestTaskSchedulerEventHandler { // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5 when(mockStatus.getExitStatus()).thenReturn(-104); - schedulerHandler.containerCompleted(mockTask, mockStatus); + schedulerHandler.containerCompleted(0, mockTask, mockStatus); assertEquals(1, mockEventHandler.events.size()); Event event = mockEventHandler.events.get(0); assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); @@ -383,4 +383,5 @@ public class TestTaskSchedulerEventHandler { } + // TODO TEZ-2003. Add tests with multiple schedulers, and ensuring that events go out with correct IDs. } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index ffab769..04610ab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -134,7 +134,7 @@ class TestTaskSchedulerHelpers { @Override public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { - taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(this, + taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0), containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync, appContext); } http://git-wip-us.apache.org/repos/asf/tez/blob/3a143a67/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index d907ea0..84d2e1f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -93,12 +93,12 @@ public class TestAMNodeTracker { amNodeTracker.start(); NodeId nodeId = NodeId.newInstance("host1", 2342); - amNodeTracker.nodeSeen(nodeId); + amNodeTracker.nodeSeen(nodeId, 0); NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY); - amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport)); + amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0)); dispatcher.await(); - assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId).getState()); + assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId, 0).getState()); amNodeTracker.stop(); } @@ -114,7 +114,7 @@ public class TestAMNodeTracker { NodeId nodeId = NodeId.newInstance("unknownhost", 2342); NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY); - amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport)); + amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport, 0)); dispatcher.await(); amNodeTracker.stop(); @@ -142,27 +142,27 @@ public class TestAMNodeTracker { amNodeTracker.init(conf); amNodeTracker.start(); - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1)); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0)); NodeId nodeId = NodeId.newInstance("host1", 1234); - amNodeTracker.nodeSeen(nodeId); + amNodeTracker.nodeSeen(nodeId, 0); - AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId); + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0); ContainerId cId1 = mock(ContainerId.class); ContainerId cId2 = mock(ContainerId.class); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2)); TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId1, ta1, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true)); dispatcher.await(); assertEquals(1, node.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node.getState()); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); dispatcher.await(); assertEquals(2, node.numFailedTAs); assertEquals(1, handler.events.size()); @@ -187,44 +187,44 @@ public class TestAMNodeTracker { amNodeTracker.init(conf); amNodeTracker.start(); - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4)); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0)); NodeId nodeId = NodeId.newInstance("host1", 1234); NodeId nodeId2 = NodeId.newInstance("host2", 1234); NodeId nodeId3 = NodeId.newInstance("host3", 1234); NodeId nodeId4 = NodeId.newInstance("host4", 1234); - amNodeTracker.nodeSeen(nodeId); - amNodeTracker.nodeSeen(nodeId2); - amNodeTracker.nodeSeen(nodeId3); - amNodeTracker.nodeSeen(nodeId4); - AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId); + amNodeTracker.nodeSeen(nodeId, 0); + amNodeTracker.nodeSeen(nodeId2, 0); + amNodeTracker.nodeSeen(nodeId3, 0); + amNodeTracker.nodeSeen(nodeId4, 0); + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0); ContainerId cId1 = mock(ContainerId.class); ContainerId cId2 = mock(ContainerId.class); ContainerId cId3 = mock(ContainerId.class); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId3)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3)); assertEquals(3, node.containers.size()); TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class); - amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1)); + amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1)); assertEquals(1, node.numSuccessfulTAs); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); assertEquals(1, node.numSuccessfulTAs); assertEquals(1, node.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node.getState()); // duplicate should not affect anything - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); assertEquals(1, node.numSuccessfulTAs); assertEquals(1, node.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node.getState()); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true)); dispatcher.await(); assertEquals(1, node.numSuccessfulTAs); assertEquals(2, node.numFailedTAs); @@ -246,20 +246,20 @@ public class TestAMNodeTracker { ContainerId cId5 = mock(ContainerId.class); TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class); - AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId4)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId5)); + AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5)); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true)); assertEquals(1, node2.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node2.getState()); handler.events.clear(); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true)); dispatcher.await(); assertEquals(2, node2.numFailedTAs); assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState()); - AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3); + AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0); assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState()); assertEquals(5, handler.events.size()); @@ -286,7 +286,7 @@ public class TestAMNodeTracker { // Increase the number of nodes. BLACKLISTING should be re-enabled. // Node 1 and Node 2 should go into BLACKLISTED state handler.events.clear(); - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8)); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0)); dispatcher.await(); LOG.info(("Completed waiting for dispatcher to process all pending events")); assertEquals(AMNodeState.BLACKLISTED, node.getState()); @@ -336,4 +336,6 @@ public class TestAMNodeTracker { doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime(); return nodeReport; } + + // TODO TEZ-2003. Add tests for multiple sources. }