Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 371AA17882 for ; Tue, 10 Feb 2015 21:52:14 +0000 (UTC) Received: (qmail 49094 invoked by uid 500); 10 Feb 2015 21:52:14 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 49012 invoked by uid 500); 10 Feb 2015 21:52:14 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 48458 invoked by uid 99); 10 Feb 2015 21:52:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2015 21:52:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 25FA1E08E3; Tue, 10 Feb 2015 21:52:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Tue, 10 Feb 2015 21:52:27 -0000 Message-Id: <2d08b9703dc04634b147b4b3ea24b5cd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/17] tez git commit: TEZ-1914. VertexManager logic should not run on the central dispatcher (bikas) TEZ-1914. VertexManager logic should not run on the central dispatcher (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f0354689 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f0354689 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f0354689 Branch: refs/heads/TEZ-2003 Commit: f03546896ed13bb605e8f39e738d4221de04bd22 Parents: 4805530 Author: Bikas Saha Authored: Tue Feb 10 13:37:43 2015 -0800 Committer: Bikas Saha Committed: Tue Feb 10 13:37:43 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/AsyncDispatcher.java | 4 +- .../java/org/apache/tez/dag/app/AppContext.java | 4 + .../org/apache/tez/dag/app/DAGAppMaster.java | 21 ++ .../tez/dag/app/dag/event/CallableEvent.java | 42 +++ .../dag/app/dag/event/CallableEventType.java | 25 ++ .../event/VertexEventInputDataInformation.java | 40 +++ .../tez/dag/app/dag/event/VertexEventType.java | 2 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 85 +++-- .../tez/dag/app/dag/impl/VertexManager.java | 311 +++++++++++++++---- .../app/dag/impl/CallableEventDispatcher.java | 37 +++ .../tez/dag/app/dag/impl/TestDAGImpl.java | 35 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 99 ++++-- .../tez/dag/app/dag/impl/TestVertexManager.java | 82 +++-- .../vertexmanager/InputReadyVertexManager.java | 2 +- .../TestInputReadyVertexManager.java | 6 - 17 files changed, 646 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4da24a7..d617bee 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1914. VertexManager logic should not run on the central dispatcher TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers. TEZ-1999. IndexOutOfBoundsException during merge. TEZ-2000. Source vertex exists error during DAG submission. http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index c23d669..253db23 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.event.Dispatcher; @@ -87,7 +86,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { this.eventDispatchers = Maps.newHashMap(); } - Runnable createThread() { + public Runnable createThread() { return new Runnable() { @Override public void run() { @@ -122,6 +121,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { @Override protected void serviceInit(Configuration conf) throws Exception { + // TODO TEZ-2049 remove YARN reference this.exitOnDispatchException = conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index f8086d0..5564809 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -37,6 +37,8 @@ import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.records.TezDAGID; +import com.google.common.util.concurrent.ListeningExecutorService; + /** * Context interface for sharing information across components in Tez DAG @@ -63,6 +65,8 @@ public interface AppContext { String getUser(); DAG getCurrentDAG(); + + ListeningExecutorService getExecService(); void setDAG(DAG dag); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c7e1e83..5aca3cf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -47,6 +47,8 @@ import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -163,6 +165,9 @@ import org.codehaus.jettison.json.JSONException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The Tez DAG Application Master. @@ -254,6 +259,10 @@ public class DAGAppMaster extends AbstractService { private Path currentRecoveryDataDir; private Path tezSystemStagingDir; private FileSystem recoveryFS; + + private ExecutorService rawExecutor; + private ListeningExecutorService execService; + /** * set of already executed dag names. */ @@ -483,6 +492,10 @@ public class DAGAppMaster extends AbstractService { } } + rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("App Shared Pool - " + "#%d").build()); + execService = MoreExecutors.listeningDecorator(rawExecutor); + initServices(conf); super.serviceInit(conf); @@ -1261,6 +1274,11 @@ public class DAGAppMaster extends AbstractService { rLock.unlock(); } } + + @Override + public ListeningExecutorService getExecService() { + return execService; + } @Override public Set getAllDAGIDs() { @@ -1677,6 +1695,7 @@ public class DAGAppMaster extends AbstractService { if (this.dagSubmissionTimer != null) { this.dagSubmissionTimer.cancel(); } + stopServices(); // Given pre-emption, we should delete tez scratch dir only if unregister is @@ -1708,6 +1727,8 @@ public class DAGAppMaster extends AbstractService { } } + execService.shutdownNow(); + super.serviceStop(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java new file mode 100644 index 0000000..e148fe8 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEvent.java @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.event; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +import com.google.common.util.concurrent.FutureCallback; + +public abstract class CallableEvent extends AbstractEvent implements + Callable { + private final FutureCallback callback; + + public CallableEvent(FutureCallback callback) { + super(CallableEventType.CALLABLE); + this.callback = callback; + } + + public FutureCallback getCallback() { + return callback; + } + + @Override + public abstract Void call() throws Exception; +} http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java new file mode 100644 index 0000000..e9e93b9 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/CallableEventType.java @@ -0,0 +1,25 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.event; + +public enum CallableEventType { + + CALLABLE, + +} http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java new file mode 100644 index 0000000..6b5cad5 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventInputDataInformation.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + +package org.apache.tez.dag.app.dag.event; + +import java.util.List; + +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.TezEvent; + +public class VertexEventInputDataInformation extends VertexEvent { + + private final List events; + + public VertexEventInputDataInformation(TezVertexID vertexId, List events) { + super(vertexId, VertexEventType.V_INPUT_DATA_INFORMATION); + this.events = events; + } + + public List getEvents() { + return events; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java index 5eb4929..aa202a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java @@ -49,6 +49,8 @@ public enum VertexEventType { //Producer: VertexInputInitializer V_ROOT_INPUT_INITIALIZED, V_ROOT_INPUT_FAILED, + + V_INPUT_DATA_INFORMATION, // Recover Event, Producer:DAG V_RECOVER, http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 149033c..aba20cf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -252,15 +252,15 @@ public class TaskImpl implements Task, EventHandler { .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of( TaskEventType.T_TERMINATE, + TaskEventType.T_SCHEDULE, TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from KILLED state .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of( TaskEventType.T_TERMINATE, + TaskEventType.T_SCHEDULE, TaskEventType.T_ADD_SPEC_ATTEMPT)) - .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, - TaskEventType.T_SCHEDULE) // create the topology tables .installTopology(); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 865b182..05c3cc1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; import com.google.common.base.Strings; + import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -114,6 +115,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask; import org.apache.tez.dag.app.dag.event.TaskEventTermination; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEvent; +import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized; import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; @@ -342,6 +344,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROOT_INPUT_INITIALIZED, new RootInputInitializedTransition()) .addTransition(VertexState.INITIALIZING, + EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, + VertexState.FAILED), + VertexEventType.V_INPUT_DATA_INFORMATION, + new InputDataInformationTransition()) + .addTransition(VertexState.INITIALIZING, EnumSet.of(VertexState.INITED, VertexState.FAILED), VertexEventType.V_READY_TO_INIT, new VertexInitializedTransition()) @@ -861,7 +868,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public int getTotalTasks() { - return numTasks; + readLock.lock(); + try { + return numTasks; + } finally { + readLock.unlock(); + } } @Override @@ -2175,7 +2187,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, .getVertexManagerPlugin()); LOG.info("Setting user vertex manager plugin: " + pluginDesc.getClassName() + " on vertex: " + getLogIdentifier()); - vertexManager = new VertexManager(pluginDesc, this, appContext, stateChangeNotifier); + vertexManager = new VertexManager(pluginDesc, dagUgi, this, appContext, stateChangeNotifier); } else { // Intended order of picking a vertex manager // If there is an InputInitializer then we use the RootInputVertexManager. May be fixed by TEZ-703 @@ -2188,26 +2200,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + logIdentifier); vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()), - this, appContext, stateChangeNotifier); + dagUgi, this, appContext, stateChangeNotifier); } else if (hasOneToOne && !hasCustom) { LOG.info("Setting vertexManager to InputReadyVertexManager for " + logIdentifier); vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()), - this, appContext, stateChangeNotifier); + dagUgi, this, appContext, stateChangeNotifier); } else if (hasBipartite && !hasCustom) { LOG.info("Setting vertexManager to ShuffleVertexManager for " + logIdentifier); // shuffle vertex manager needs a conf payload vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(), - this, appContext, stateChangeNotifier); + dagUgi, this, appContext, stateChangeNotifier); } else { // schedule all tasks upon vertex start. Default behavior. LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + logIdentifier); vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()), - this, appContext, stateChangeNotifier); + dagUgi, this, appContext, stateChangeNotifier); } } } @@ -3063,14 +3075,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState state = vertex.getState(); if (state == VertexState.INITIALIZING) { try { - List inputInfoEvents = - vertex.vertexManager.onRootVertexInitialized( - liInitEvent.getInputName(), - vertex.getAdditionalInputs().get(liInitEvent.getInputName()) - .getIODescriptor(), liInitEvent.getEvents()); - if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { - VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false); - } + vertex.vertexManager.onRootVertexInitialized(liInitEvent.getInputName(), vertex + .getAdditionalInputs().get(liInitEvent.getInputName()).getIODescriptor(), + liInitEvent.getEvents()); } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); LOG.error(msg, e); @@ -3087,10 +3094,35 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.rootInputInitializerManager.shutdown(); vertex.rootInputInitializerManager = null; } + + // the return of these events from the VM will complete initialization and move into + // INITED state if possible via InputDataInformationTransition + + return vertex.getState(); + } + } + + public static class InputDataInformationTransition implements + MultipleArcTransition { + + @Override + public VertexState transition(VertexImpl vertex, VertexEvent event) { + VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event; + List inputInfoEvents = iEvent.getEvents(); + try { + if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) { + VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false); + } + } catch (AMUserCodeException e) { + String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); + LOG.error(msg, e); + vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE, msg + "," + + ExceptionUtils.getStackTrace(e.getCause())); + return VertexState.FAILED; + } // done. check if we need to do the initialization - if (vertex.getState() == VertexState.INITIALIZING && - vertex.initWaitsForRootInitializers) { + if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers) { if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) { // set the wait flag to false if all initializers are done vertex.initWaitsForRootInitializers = false; @@ -4021,7 +4053,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public Map> getAdditionalInputs() { - return this.rootInputDescriptors; + readLock.lock(); + try { + return this.rootInputDescriptors; + } finally { + readLock.unlock(); + } } @Nullable @@ -4059,7 +4096,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public Map getInputVertices() { - return Collections.unmodifiableMap(this.sourceVertices); + readLock.lock(); + try { + return Collections.unmodifiableMap(this.sourceVertices); + } finally { + readLock.unlock(); + } } @Override @@ -4092,7 +4134,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } public Resource getTaskResource() { - return taskResource; + readLock.lock(); + try { + return taskResource; + } finally { + readLock.unlock(); + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index da86151..af92348 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl; import static com.google.common.base.Preconditions.checkNotNull; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ReflectionUtils; @@ -54,6 +57,8 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.CallableEvent; +import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; @@ -62,7 +67,6 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; @@ -72,18 +76,30 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +@SuppressWarnings("unchecked") public class VertexManager { - VertexManagerPluginDescriptor pluginDesc; - VertexManagerPlugin plugin; - Vertex managedVertex; - VertexManagerPluginContextImpl pluginContext; - UserPayload payload = null; - AppContext appContext; - BlockingQueue rootInputInitEventQueue; - StateChangeNotifier stateChangeNotifier; + final VertexManagerPluginDescriptor pluginDesc; + final UserGroupInformation dagUgi; + final VertexManagerPlugin plugin; + final Vertex managedVertex; + final VertexManagerPluginContextImpl pluginContext; + final UserPayload payload; + final AppContext appContext; + final BlockingQueue rootInputInitEventQueue; + final StateChangeNotifier stateChangeNotifier; + + private final ListeningExecutorService execService; + private final LinkedBlockingQueue eventQueue; + private final AtomicBoolean eventInFlight; + private final AtomicBoolean pluginFailed; private static final Log LOG = LogFactory.getLog(VertexManager.class); + private final VertexManagerCallback VM_CALLBACK = new VertexManagerCallback(); class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener { @@ -97,6 +113,9 @@ public class VertexManager { if (isComplete()) { throw new TezUncheckedException("Cannot invoke context methods after reporting done"); } + if (pluginFailed.get()) { + throw new TezUncheckedException("Cannot invoke context methods after throwing an exception"); + } } @Override @@ -233,6 +252,7 @@ public class VertexManager { return appContext.getTaskScheduler().getNumClusterNodes(); } + // TODO TEZ-2048. Remove this API @Override public synchronized Container getTaskContainer(String vertexName, Integer taskIndex) { checkAndThrowIfDone(); @@ -287,41 +307,36 @@ public class VertexManager { managedVertex.doneReconfiguringVertex(); } - @SuppressWarnings("unchecked") @Override public synchronized void onStateUpdated(VertexStateUpdate event) { - if (isComplete()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Dropping state update for vertex=" + event.getVertexName() + ", state=" + - event.getVertexState() + - " since vertexmanager for " + managedVertex.getLogIdentifier() + " is complete."); - } - } else { - try { - plugin.onVertexStateUpdated(event); - } catch (Exception e) { - // state change must be triggered via an event transition - appContext.getEventHandler().handle( - new VertexEventManagerUserCodeError(managedVertex.getVertexId(), - new AMUserCodeException(Source.VertexManager, e))); - } - } + enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event)); } } - public VertexManager(VertexManagerPluginDescriptor pluginDesc, + public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi, Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) { checkNotNull(pluginDesc, "pluginDesc is null"); checkNotNull(managedVertex, "managedVertex is null"); checkNotNull(appContext, "appContext is null"); checkNotNull(stateChangeNotifier, "notifier is null"); this.pluginDesc = pluginDesc; + this.dagUgi = dagUgi; this.managedVertex = managedVertex; this.appContext = appContext; this.stateChangeNotifier = stateChangeNotifier; // don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll this.rootInputInitEventQueue = new LinkedBlockingQueue(); + + pluginContext = new VertexManagerPluginContextImpl(); + Preconditions.checkArgument(pluginDesc != null); + plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), + new Class[] { VertexManagerPluginContext.class }, new Object[] { pluginContext }); + payload = pluginDesc.getUserPayload(); + execService = appContext.getExecService(); + eventQueue = new LinkedBlockingQueue(); + eventInFlight = new AtomicBoolean(false); + pluginFailed = new AtomicBoolean(false); } public VertexManagerPlugin getPlugin() { @@ -329,20 +344,57 @@ public class VertexManager { } public void initialize() throws AMUserCodeException { - pluginContext = new VertexManagerPluginContextImpl(); - if (pluginDesc != null) { - plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), - new Class[]{VertexManagerPluginContext.class}, new Object[]{pluginContext}); - payload = pluginDesc.getUserPayload(); - } try { if (!pluginContext.isComplete()) { - plugin.initialize(); + // TODO TEZ-2066 tracks moving this async. + synchronized (VertexManager.this) { + plugin.initialize(); + } } } catch (Exception e) { throw new AMUserCodeException(Source.VertexManager, e); } } + + private boolean pluginInvocationAllowed(String msg) { + if (pluginFailed.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug(msg + " . Manager failed. Vertex=" + managedVertex.getLogIdentifier()); + } + return false; + } + if (pluginContext.isComplete()) { + if (LOG.isDebugEnabled()) { + LOG.debug(msg+ " . Manager complete. Not scheduling event. Vertex=" + managedVertex.getLogIdentifier()); + } + return false; + } + return true; + } + + private void enqueueAndScheduleNextEvent(VertexManagerEvent e) { + if (!pluginInvocationAllowed("Dropping event")) { + return; + } + eventQueue.add(e); + tryScheduleNextEvent(); + } + + private void tryScheduleNextEvent() { + if (!pluginInvocationAllowed("Not scheduling")) { + return; + } + if (eventQueue.isEmpty()) { + return; + } + if (eventInFlight.compareAndSet(false, true)) { + // no event was in flight + VertexManagerEvent e = eventQueue.poll(); + Preconditions.checkState(e != null); + ListenableFuture future = execService.submit(e); + Futures.addCallback(future, e.getCallback()); + } + } public void onVertexStarted(List completions) throws AMUserCodeException { Map> pluginCompletionsMap = Maps.newHashMap(); @@ -360,53 +412,180 @@ public class VertexManager { taskIdList.add(taskId); } } - try { - if (!pluginContext.isComplete()) { - plugin.onVertexStarted(pluginCompletionsMap); - } - } catch (Exception e) { - throw new AMUserCodeException(Source.VertexManager, e); - } + enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(pluginCompletionsMap)); } public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException { Integer taskId = Integer.valueOf(tezTaskId.getId()); String vertexName = appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName(); - try { - if (!pluginContext.isComplete()) { - plugin.onSourceTaskCompleted(vertexName, taskId); - } - } catch (Exception e) { - throw new AMUserCodeException(Source.VertexManager, e); + enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(taskId, vertexName)); + } + + public void onVertexManagerEventReceived( + org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) throws AMUserCodeException { + enqueueAndScheduleNextEvent(new VertexManagerEventReceived(vmEvent)); + } + + public void onRootVertexInitialized(String inputName, + InputDescriptor inputDescriptor, List events) throws AMUserCodeException { + if (LOG.isDebugEnabled()) { + LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; enqueueing onRootVertexInitialized" + + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size()); } + enqueueAndScheduleNextEvent(new VertexManagerEventRootInputInitialized(inputName, + inputDescriptor, events)); } - public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws AMUserCodeException { - try { - if (!pluginContext.isComplete()) { - plugin.onVertexManagerEventReceived(vmEvent); + private class VertexManagerCallback implements FutureCallback { + + @Override + public void onFailure(Throwable t) { + // stop further event processing + pluginFailed.set(true); + eventQueue.clear(); + // catch real root cause of failure, it would throw UndeclaredThrowableException + // if using UGI.doAs + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); } - } catch (Exception e) { - throw new AMUserCodeException(Source.VertexManager, e); + Preconditions.checkState(appContext != null); + Preconditions.checkState(managedVertex != null); + // state change must be triggered via an event transition + appContext.getEventHandler().handle( + new VertexEventManagerUserCodeError(managedVertex.getVertexId(), + new AMUserCodeException(Source.VertexManager, t))); + // enqueue no further events due to user code error + } + + @Override + public void onSuccess(Void result) { + Preconditions.checkState(eventInFlight.get()); + eventInFlight.set(false); + tryScheduleNextEvent(); } } + + private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback { - public List onRootVertexInitialized(String inputName, - InputDescriptor inputDescriptor, List events) throws AMUserCodeException { - try { - if (!pluginContext.isComplete()) { - plugin.onRootVertexInitialized(inputName, inputDescriptor, events); + @Override + public void onSuccess(Void result) { + super.onSuccess(result); + if (LOG.isDebugEnabled()) { + LOG.debug("vertex:" + managedVertex.getLogIdentifier() + + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:" + + ", current task events size is " + rootInputInitEventQueue.size()); } - } catch (Exception e) { - throw new AMUserCodeException(Source.VertexManager, e); + List resultEvents = new ArrayList(); + rootInputInitEventQueue.drainTo(resultEvents); + appContext.getEventHandler().handle( + new VertexEventInputDataInformation(managedVertex.getVertexId(), resultEvents)); } - if (LOG.isDebugEnabled()) { - LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; after call of VertexManagerPlugin.onRootVertexInitialized" - + " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size()); + } + + class VertexManagerEventOnVertexStateUpdate extends VertexManagerEvent { + private final VertexStateUpdate event; + + public VertexManagerEventOnVertexStateUpdate(VertexStateUpdate event) { + this.event = event; + } + + @Override + public void invoke() throws Exception { + plugin.onVertexStateUpdated(event); + } + + } + + class VertexManagerEventOnVertexStarted extends VertexManagerEvent { + private final Map> pluginCompletionsMap; + + public VertexManagerEventOnVertexStarted(Map> pluginCompletionsMap) { + this.pluginCompletionsMap = pluginCompletionsMap; + } + + @Override + public void invoke() throws Exception { + plugin.onVertexStarted(pluginCompletionsMap); + } + + } + + class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent { + private final Integer taskId; + private final String vertexName; + + public VertexManagerEventSourceTaskCompleted(Integer taskId, String vertexName) { + this.taskId = taskId; + this.vertexName = vertexName; + } + + @Override + public void invoke() throws Exception { + plugin.onSourceTaskCompleted(vertexName, taskId); } - List resultEvents = new ArrayList(); - rootInputInitEventQueue.drainTo(resultEvents); - return resultEvents; + + } + + class VertexManagerEventReceived extends VertexManagerEvent { + private final org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent; + + public VertexManagerEventReceived(org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) { + this.vmEvent = vmEvent; + } + + @Override + public void invoke() throws Exception { + plugin.onVertexManagerEventReceived(vmEvent); + } + + } + + class VertexManagerEventRootInputInitialized extends VertexManagerEvent { + private final String inputName; + private final InputDescriptor inputDescriptor; + private final List events; + + public VertexManagerEventRootInputInitialized(String inputName, + InputDescriptor inputDescriptor, List events) { + super(new VertexManagerRootInputInitializedCallback()); + this.inputName = inputName; + this.inputDescriptor = inputDescriptor; + this.events = events; + } + + @Override + public void invoke() throws Exception { + plugin.onRootVertexInitialized(inputName, inputDescriptor, events); + } + + } + + abstract class VertexManagerEvent extends CallableEvent { + public VertexManagerEvent() { + this(VM_CALLBACK); + } + public VertexManagerEvent(VertexManagerCallback callback) { + super(callback); + } + + @Override + public Void call() throws Exception { + final VertexManager manager = VertexManager.this; + manager.dagUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + synchronized (manager) { + if (manager.pluginInvocationAllowed("Not invoking")) { + invoke(); + } + } + return null; + } + }); + return null; + } + + public abstract void invoke() throws Exception; } } http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java new file mode 100644 index 0000000..a81bd68 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/CallableEventDispatcher.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.app.dag.event.CallableEvent; + +public class CallableEventDispatcher implements EventHandler { + + @Override + public void handle(CallableEvent event) { + try { + event.call(); + event.getCallback().onSuccess(null); + } catch (Exception e) { + event.getCallback().onFailure(e); + } + } + + } + http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index cae9059..599f01e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.dag.impl; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -26,9 +27,10 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -37,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -58,7 +61,6 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos; @@ -85,6 +87,8 @@ import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.VertexTerminationCause; +import org.apache.tez.dag.app.dag.event.CallableEvent; +import org.apache.tez.dag.app.dag.event.CallableEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; @@ -125,8 +129,13 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.protobuf.ByteString; public class TestDAGImpl { @@ -136,6 +145,7 @@ public class TestDAGImpl { private TezDAGID dagId; private static Configuration conf; private DrainDispatcher dispatcher; + private ListeningExecutorService execService; private Credentials fsTokens; private AppContext appContext; private ACLManager aclManager; @@ -724,6 +734,7 @@ public class TestDAGImpl { MockDNSToSwitchMapping.initializeMockRackResolver(); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Before public void setup() { conf = new Configuration(); @@ -736,6 +747,19 @@ public class TestDAGImpl { dispatcher = new DrainDispatcher(); fsTokens = new Credentials(); appContext = mock(AppContext.class); + execService = mock(ListeningExecutorService.class); + final ListenableFuture mockFuture = mock(ListenableFuture.class); + + Mockito.doAnswer(new Answer() { + public ListenableFuture answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + CallableEvent e = (CallableEvent) args[0]; + dispatcher.getEventHandler().handle(e); + return mockFuture; + }}) + .when(execService).submit((Callable) any()); + + doReturn(execService).when(appContext).getExecService(); historyEventHandler = mock(HistoryEventHandler.class); aclManager = new ACLManager("amUser"); doReturn(conf).when(appContext).getAMConf(); @@ -750,6 +774,7 @@ public class TestDAGImpl { doReturn(dag).when(appContext).getCurrentDAG(); mrrAppContext = mock(AppContext.class); doReturn(aclManager).when(mrrAppContext).getAMACLManager(); + doReturn(execService).when(mrrAppContext).getExecService(); mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2); mrrDagPlan = createTestMRRDAGPlan(); mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan, @@ -763,6 +788,7 @@ public class TestDAGImpl { doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler(); groupAppContext = mock(AppContext.class); doReturn(aclManager).when(groupAppContext).getAMACLManager(); + doReturn(execService).when(groupAppContext).getExecService(); groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3); groupDagPlan = createGroupDAGPlan(); groupDag = new DAGImpl(groupDagId, conf, groupDagPlan, @@ -778,6 +804,7 @@ public class TestDAGImpl { // reset totalCommitCounter to 0 TotalCountingOutputCommitter.totalCommitCounter = 0; + dispatcher.register(CallableEventType.class, new CallableEventDispatcher()); taskEventDispatcher = new TaskEventDispatcher(); dispatcher.register(TaskEventType.class, taskEventDispatcher); taskAttemptEventDispatcher = new TaskAttemptEventDispatcher(); @@ -797,6 +824,7 @@ public class TestDAGImpl { public void teardown() { dispatcher.await(); dispatcher.stop(); + execService.shutdownNow(); dagPlan = null; dag = null; } @@ -817,6 +845,7 @@ public class TestDAGImpl { dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext); doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf(); + doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService(); doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG(); doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId(); doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID(); @@ -838,7 +867,7 @@ public class TestDAGImpl { dispatcher.await(); Assert.assertEquals(DAGState.RUNNING, impl.getState()); } - + @Test(timeout = 5000) public void testDAGInit() { initDAG(dag); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 83a3a8a..2c6f5e0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -116,6 +117,8 @@ import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; import org.apache.tez.dag.app.dag.VertexTerminationCause; +import org.apache.tez.dag.app.dag.event.CallableEvent; +import org.apache.tez.dag.app.dag.event.CallableEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; @@ -176,10 +179,13 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.internal.util.collections.Sets; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -187,6 +193,7 @@ import org.mockito.stubbing.Answer; public class TestVertexImpl { private static final Log LOG = LogFactory.getLog(TestVertexImpl.class); + private ListeningExecutorService execService; private boolean useCustomInitializer = false; private InputInitializer customInitializer = null; @@ -2119,6 +2126,7 @@ public class TestVertexImpl { anyInt()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) public void setupPostDagCreation() throws AMUserCodeException { String dagName = "dag0"; dispatcher = new DrainDispatcher(); @@ -2138,6 +2146,19 @@ public class TestVertexImpl { doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID(); doReturn(dag).when(appContext).getCurrentDAG(); + execService = mock(ListeningExecutorService.class); + final ListenableFuture mockFuture = mock(ListenableFuture.class); + + Mockito.doAnswer(new Answer() { + public ListenableFuture answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + CallableEvent e = (CallableEvent) args[0]; + dispatcher.getEventHandler().handle(e); + return mockFuture; + }}) + .when(execService).submit((Callable) any()); + + doReturn(execService).when(appContext).getExecService(); doReturn(conf).when(appContext).getAMConf(); doReturn(new Credentials()).when(dag).getCredentials(); doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan(); @@ -2191,6 +2212,7 @@ public class TestVertexImpl { edge.initialize(); } + dispatcher.register(CallableEventType.class, new CallableEventDispatcher()); taskAttemptEventDispatcher = new TaskAttemptEventDispatcher(); dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher); taskEventDispatcher = new TaskEventDispatcher(); @@ -2224,6 +2246,7 @@ public class TestVertexImpl { dispatcher.await(); dispatcher.stop(); } + execService.shutdownNow(); dispatcher = null; vertexEventDispatcher = null; dagEventDispatcher = null; @@ -2247,7 +2270,6 @@ public class TestVertexImpl { } } - @SuppressWarnings("unchecked") private void initVertex(VertexImpl v) { Assert.assertEquals(VertexState.NEW, v.getState()); @@ -2382,6 +2404,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITED, v2.getState()); Assert.assertEquals(0, listener.events.size()); // configured event not sent startVertex(v1, true); + dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v2.getState()); Assert.assertEquals(1, listener.events.size()); // configured event sent after VM Assert.assertEquals("vertex2", listener.events.get(0).getVertexName()); @@ -2821,12 +2844,13 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexTaskAttemptProcessorFailure() { + public void testVertexTaskAttemptProcessorFailure() throws Exception { initAllVertices(VertexState.INITED); VertexImpl v = vertices.get("vertex1"); startVertex(v); + dispatcher.await(); TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); @@ -2856,12 +2880,13 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexTaskAttemptInputFailure() { + public void testVertexTaskAttemptInputFailure() throws Exception { initAllVertices(VertexState.INITED); VertexImpl v = vertices.get("vertex1"); startVertex(v); + dispatcher.await(); TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); @@ -2892,12 +2917,13 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexTaskAttemptOutputFailure() { + public void testVertexTaskAttemptOutputFailure() throws Exception { initAllVertices(VertexState.INITED); VertexImpl v = vertices.get("vertex1"); startVertex(v); + dispatcher.await(); TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); @@ -3355,7 +3381,7 @@ public class TestVertexImpl { @Test(timeout = 5000) - public void testVertexWithOneToOneSplit() throws AMUserCodeException { + public void testVertexWithOneToOneSplit() throws Exception { // create a diamond shaped dag with 1-1 edges. // split the source and remaining vertices should split equally // vertex with 2 incoming splits from the same source should split once @@ -3386,7 +3412,7 @@ public class TestVertexImpl { RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); List v1Hints = createTaskLocationHints(numTasks); initializerManager1.completeInputInitialization(0, numTasks, v1Hints); - + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v1.getState()); Assert.assertEquals(numTasks, v1.getTotalTasks()); Assert.assertEquals(RootInputVertexManager.class.getName(), v1 @@ -3437,10 +3463,10 @@ public class TestVertexImpl { // fudge vertex manager so that tasks dont start running v1.vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), - v1, appContext, mock(StateChangeNotifier.class)); + UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class)); v1.vertexManager.initialize(); startVertex(v1); - + dispatcher.await(); Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks()); Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks()); @@ -3476,7 +3502,7 @@ public class TestVertexImpl { // fudge vertex manager so that tasks dont start running v1.vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), - v1, appContext, mock(StateChangeNotifier.class)); + UserGroupInformation.getCurrentUser(), v1, appContext, mock(StateChangeNotifier.class)); v1.vertexManager.initialize(); Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks()); @@ -3519,7 +3545,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexWithInitializerFailure() throws AMUserCodeException { + public void testVertexWithInitializerFailure() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); @@ -3548,7 +3574,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITIALIZING, v2.getState()); RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager(); initializerManager2.failInputInitialization(); - + dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v2.getState()); Assert.assertEquals(RootInputVertexManager.class.getName(), v2 .getVertexManager().getPlugin().getClass().getName()); @@ -4400,7 +4426,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testVertexWithMultipleInitializers1() throws AMUserCodeException { + public void testVertexWithMultipleInitializers1() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); @@ -4419,15 +4445,17 @@ public class TestVertexImpl { // Complete initializer which sets parallelism first initializerManager1.completeInputInitialization(0, 5, v1Hints); + dispatcher.await(); Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); // Complete second initializer initializerManager1.completeInputInitialization(1); + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v1.getState()); } @Test(timeout = 5000) - public void testVertexWithMultipleInitializers2() throws AMUserCodeException { + public void testVertexWithMultipleInitializers2() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer"); @@ -4446,16 +4474,18 @@ public class TestVertexImpl { // Complete initializer which does not set parallelism initializerManager1.completeInputInitialization(1); + dispatcher.await(); Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); // Complete second initializer which sets parallelism initializerManager1.completeInputInitialization(0, 5, v1Hints); + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v1.getState()); } @SuppressWarnings("unchecked") @Test(timeout = 500000) - public void testVertexWithInitializerSuccess() throws AMUserCodeException { + public void testVertexWithInitializerSuccess() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); @@ -4470,7 +4500,7 @@ public class TestVertexImpl { RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); List v1Hints = createTaskLocationHints(5); initializerManager1.completeInputInitialization(0, 5, v1Hints); - + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v1.getState()); Assert.assertEquals(5, v1.getTotalTasks()); // task events get buffered @@ -4510,7 +4540,7 @@ public class TestVertexImpl { RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager(); List v2Hints = createTaskLocationHints(10); initializerManager2.completeInputInitialization(0, 10, v2Hints); - + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v2.getState()); Assert.assertEquals(10, v2.getTotalTasks()); // task events get buffered @@ -4530,7 +4560,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexWithInputDistributor() throws AMUserCodeException { + public void testVertexWithInputDistributor() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer"); @@ -4547,6 +4577,7 @@ public class TestVertexImpl { RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); byte[] payload = new byte[0]; initializerManager1.completeInputDistribution(payload); + dispatcher.await(); // edge is still null so its initializing Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); Assert.assertEquals(true, initializerManager1.hasShutDown); @@ -4565,7 +4596,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexRootInputSpecUpdateAll() throws AMUserCodeException { + public void testVertexRootInputSpecUpdateAll() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); @@ -4580,7 +4611,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITIALIZING, v3.getState()); RootInputInitializerManagerControlled initializerManager1 = v3.getRootInputInitializerManager(); initializerManager1.completeInputInitialization(); - + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v3.getState()); Assert.assertEquals(expectedNumTasks, v3.getTotalTasks()); Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v3.getVertexManager() @@ -4596,7 +4627,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testVertexRootInputSpecUpdatePerTask() throws AMUserCodeException { + public void testVertexRootInputSpecUpdatePerTask() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer"); @@ -4611,7 +4642,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITIALIZING, v4.getState()); RootInputInitializerManagerControlled initializerManager1 = v4.getRootInputInitializerManager(); initializerManager1.completeInputInitialization(); - + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v4.getState()); Assert.assertEquals(expectedNumTasks, v4.getTotalTasks()); Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), v4.getVertexManager() @@ -4969,7 +5000,7 @@ public class TestVertexImpl { // fudge the vm so we can do custom stuff vB.vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), - vB, appContext, mock(StateChangeNotifier.class)); + UserGroupInformation.getCurrentUser(), vB, appContext, mock(StateChangeNotifier.class)); vB.vertexReconfigurationPlanned(); @@ -5093,7 +5124,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromVM_OnRootVertexInitialized() throws AMUserCodeException { + public void testExceptionFromVM_OnRootVertexInitialized() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnRootVertexInitialized); @@ -5118,7 +5149,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromVM_OnVertexStarted() throws AMUserCodeException { + public void testExceptionFromVM_OnVertexStarted() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexStarted); @@ -5136,7 +5167,7 @@ public class TestVertexImpl { dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START)); dispatcher.await(); - + Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass()); Assert.assertEquals(VertexState.FAILED, v1.getState()); String diagnostics = StringUtils.join(v1.getDiagnostics(), ","); @@ -5146,7 +5177,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromVM_OnSourceTaskCompleted() throws AMUserCodeException { + public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnSourceTaskCompleted); @@ -5183,7 +5214,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromVM_OnVertexManagerEventReceived() throws AMUserCodeException { + public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.OnVertexManagerEventReceived); @@ -5210,7 +5241,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws AMUserCodeException { + public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VMExceptionLocation.OnVertexManagerVertexStateUpdated); @@ -5228,7 +5259,7 @@ public class TestVertexImpl { dispatcher.await(); Assert.assertEquals(VertexState.INITED, v2.getState()); startVertex(v1, false); - + dispatcher.await(); Assert.assertEquals(VertexState.FAILED, v2.getState()); String diagnostics = StringUtils.join(v2.getDiagnostics(), ","); assertTrue(diagnostics.contains(VMExceptionLocation.OnVertexManagerVertexStateUpdated.name())); @@ -5261,7 +5292,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromII_InitFailedAfterInitialized() throws AMUserCodeException { + public void testExceptionFromII_InitFailedAfterInitialized() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithIIException(); @@ -5272,6 +5303,7 @@ public class TestVertexImpl { initVertex(v1); RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); initializerManager1.completeInputInitialization(0); + dispatcher.await(); Assert.assertEquals(VertexState.INITED, v1.getState()); String errorMsg = "ErrorWhenInitFailureAtInited"; dispatcher.getEventHandler().handle(new VertexEventRootInputFailed(v1.getVertexId(), "input1", @@ -5285,7 +5317,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromII_InitFailedAfterRunning() throws AMUserCodeException { + public void testExceptionFromII_InitFailedAfterRunning() throws Exception { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithIIException(); @@ -5296,6 +5328,7 @@ public class TestVertexImpl { initVertex(v1); RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager(); initializerManager1.completeInputInitialization(0); + dispatcher.await(); startVertex(v1); Assert.assertEquals(VertexState.RUNNING, v1.getState()); String errorMsg = "ErrorWhenInitFailureAtRunning"; @@ -5310,7 +5343,7 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) - public void testExceptionFromII_HandleInputInitializerEvent() throws AMUserCodeException, InterruptedException { + public void testExceptionFromII_HandleInputInitializerEvent() throws Exception { useCustomInitializer = true; customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent); EventHandlingRootInputInitializer initializer = @@ -5351,7 +5384,7 @@ public class TestVertexImpl { dispatcher.getEventHandler() .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent))); dispatcher.await(); - + // it would cause v2 fail as its II throw exception in handleInputInitializerEvent String diagnostics = StringUtils.join(v2.getDiagnostics(), ","); assertTrue(diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name())); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java index 73dc5eb..81cb42a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java @@ -23,6 +23,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -31,7 +33,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; @@ -39,38 +44,73 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.CallableEvent; +import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.TezEvent; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +@SuppressWarnings({ "rawtypes", "unchecked" }) public class TestVertexManager { - - @Test(timeout = 5000) - public void testOnRootVertexInitialized() throws Exception { - Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS); - AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + AppContext mockAppContext; + ListeningExecutorService execService; + Vertex mockVertex; + EventHandler mockHandler; + ArgumentCaptor requestCaptor; + + @Before + public void setup() { + mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + execService = mock(ListeningExecutorService.class); + final ListenableFuture mockFuture = mock(ListenableFuture.class); + Mockito.doAnswer(new Answer() { + public ListenableFuture answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + CallableEvent e = (CallableEvent) args[0]; + new CallableEventDispatcher().handle(e); + return mockFuture; + }}) + .when(execService).submit((Callable) any()); + doReturn(execService).when(mockAppContext).getExecService(); + mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS); doReturn("vertex1").when(mockVertex).getName(); + mockHandler = mock(EventHandler.class); + when(mockAppContext.getEventHandler()).thenReturn(mockHandler); when( mockAppContext.getCurrentDAG().getVertex(any(String.class)) .getTotalTasks()).thenReturn(1); + requestCaptor = ArgumentCaptor.forClass(VertexEventInputDataInformation.class); + } + + @Test(timeout = 5000) + public void testOnRootVertexInitialized() throws Exception { VertexManager vm = new VertexManager( VertexManagerPluginDescriptor.create(RootInputVertexManager.class - .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class)); + .getName()), UserGroupInformation.getCurrentUser(), + mockVertex, mockAppContext, mock(StateChangeNotifier.class)); vm.initialize(); InputDescriptor id1 = mock(InputDescriptor.class); List events1 = new LinkedList(); InputDataInformationEvent diEvent1 = InputDataInformationEvent.createWithSerializedPayload(0, null); events1.add(diEvent1); - List tezEvents1 = - vm.onRootVertexInitialized("input1", id1, events1); + vm.onRootVertexInitialized("input1", id1, events1); + verify(mockHandler, times(1)).handle(requestCaptor.capture()); + List tezEvents1 = requestCaptor.getValue().getEvents(); assertEquals(1, tezEvents1.size()); assertEquals(diEvent1, tezEvents1.get(0).getEvent()); @@ -79,8 +119,9 @@ public class TestVertexManager { InputDataInformationEvent diEvent2 = InputDataInformationEvent.createWithSerializedPayload(0, null); events2.add(diEvent2); - List tezEvents2 = - vm.onRootVertexInitialized("input1", id2, events2); + vm.onRootVertexInitialized("input1", id2, events2); + verify(mockHandler, times(2)).handle(requestCaptor.capture()); + List tezEvents2 = requestCaptor.getValue().getEvents(); assertEquals(tezEvents2.size(), 1); assertEquals(diEvent2, tezEvents2.get(0).getEvent()); } @@ -92,17 +133,11 @@ public class TestVertexManager { */ @Test(timeout = 5000) public void testOnRootVertexInitialized2() throws Exception { - Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS); - AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); - doReturn("vertex1").when(mockVertex).getName(); - when( - mockAppContext.getCurrentDAG().getVertex(any(String.class)) - .getTotalTasks()).thenReturn(1); - VertexManager vm = new VertexManager( VertexManagerPluginDescriptor.create(CustomVertexManager.class - .getName()), mockVertex, mockAppContext, mock(StateChangeNotifier.class)); + .getName()), UserGroupInformation.getCurrentUser(), + mockVertex, mockAppContext, mock(StateChangeNotifier.class)); vm.initialize(); InputDescriptor id1 = mock(InputDescriptor.class); List events1 = new LinkedList(); @@ -111,17 +146,20 @@ public class TestVertexManager { events1.add(diEvent1); // do not call context.addRootInputEvents, just cache the TezEvent - List tezEventsAfterInput1 = vm.onRootVertexInitialized("input1", id1, events1); + vm.onRootVertexInitialized("input1", id1, events1); + verify(mockHandler, times(1)).handle(requestCaptor.capture()); + List tezEventsAfterInput1 = requestCaptor.getValue().getEvents(); assertEquals(0, tezEventsAfterInput1.size()); - + InputDescriptor id2 = mock(InputDescriptor.class); List events2 = new LinkedList(); InputDataInformationEvent diEvent2 = InputDataInformationEvent.createWithSerializedPayload(0, null); events2.add(diEvent2); // call context.addRootInputEvents(input1), context.addRootInputEvents(input2) - List tezEventsAfterInput2 = - vm.onRootVertexInitialized("input2", id2, events2); + vm.onRootVertexInitialized("input2", id2, events2); + verify(mockHandler, times(2)).handle(requestCaptor.capture()); + List tezEventsAfterInput2 = requestCaptor.getValue().getEvents(); assertEquals(2, tezEventsAfterInput2.size()); // also verify the EventMetaData http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java index e2e9dd3..bbe9dcf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java @@ -170,7 +170,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { numConfiguredSources++; int target = getContext().getInputVertexEdgeProperties().size(); - LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal from: " + LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: " + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources + " needed: " + target); Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName()); http://git-wip-us.apache.org/repos/asf/tez/blob/f0354689/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index 8de747d..411ea71 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -243,12 +243,6 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); - when(mockContext.getTaskContainer(mockSrcVertexId2, 0)).thenReturn(mockContainer2); - when(mockContext.getTaskContainer(mockSrcVertexId2, 1)).thenReturn(mockContainer2); - when(mockContext.getTaskContainer(mockSrcVertexId2, 2)).thenReturn(mockContainer2); - when(mockContext.getTaskContainer(mockSrcVertexId3, 0)).thenReturn(mockContainer3); - when(mockContext.getTaskContainer(mockSrcVertexId3, 1)).thenReturn(mockContainer3); - when(mockContext.getTaskContainer(mockSrcVertexId3, 2)).thenReturn(mockContainer3); mockInputVertices.put(mockSrcVertexId1, eProp1); mockInputVertices.put(mockSrcVertexId2, eProp2); mockInputVertices.put(mockSrcVertexId3, eProp3);