Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B9433182D3 for ; Thu, 18 Feb 2016 09:58:02 +0000 (UTC) Received: (qmail 68563 invoked by uid 500); 18 Feb 2016 09:57:56 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 68453 invoked by uid 500); 18 Feb 2016 09:57:56 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 68148 invoked by uid 99); 18 Feb 2016 09:57:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 09:57:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2A86CE0577; Thu, 18 Feb 2016 09:57:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sree@apache.org To: commits@tez.apache.org Date: Thu, 18 Feb 2016 09:58:16 -0000 Message-Id: In-Reply-To: <56a3befb33c349f7bf43e58a8338f007@git.apache.org> References: <56a3befb33c349f7bf43e58a8338f007@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/24] tez git commit: TEZ-3029. Add an onError method to service plugin contexts. (sseth) TEZ-3029. Add an onError method to service plugin contexts. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a812c346 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a812c346 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a812c346 Branch: refs/heads/TEZ-2980 Commit: a812c3462808e73b8a59e1852ff2547dcbafbf84 Parents: fec46aa Author: Siddharth Seth Authored: Wed Feb 17 13:39:11 2016 -0800 Committer: Siddharth Seth Committed: Wed Feb 17 13:39:11 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../api/ContainerLauncherContext.java | 12 +- .../apache/tez/serviceplugins/api/DagInfo.java | 30 +++ .../api/ServicePluginContextBase.java | 49 ++++ .../serviceplugins/api/ServicePluginError.java | 48 ++++ .../api/ServicePluginErrorDefaults.java | 76 ++++++ .../api/TaskSchedulerContext.java | 19 +- tez-dag/src/main/java/org/apache/tez/Utils.java | 33 +++ .../tez/dag/api/client/DAGClientHandler.java | 5 +- .../dag/app/ContainerLauncherContextImpl.java | 27 ++- .../org/apache/tez/dag/app/DAGAppMaster.java | 24 +- .../dag/app/TaskCommunicatorContextImpl.java | 17 ++ .../tez/dag/app/TaskCommunicatorManager.java | 26 ++ .../app/TaskCommunicatorManagerInterface.java | 4 + .../java/org/apache/tez/dag/app/dag/DAG.java | 3 +- .../tez/dag/app/dag/DAGTerminationCause.java | 3 + .../tez/dag/app/dag/VertexTerminationCause.java | 2 +- ...DAGAppMasterEventSchedulingServiceError.java | 15 +- .../dag/app/dag/event/DAGEventTerminateDag.java | 38 +++ .../tez/dag/app/dag/event/DAGEventType.java | 4 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 82 ++++--- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +- .../app/launcher/ContainerLauncherManager.java | 59 ++++- .../dag/app/rm/TaskSchedulerContextImpl.java | 22 +- .../app/rm/TaskSchedulerContextImplWrapper.java | 33 ++- .../tez/dag/app/rm/TaskSchedulerManager.java | 36 ++- .../dag/app/rm/YarnTaskSchedulerService.java | 6 +- .../app/rm/YarnTaskSchedulerServiceError.java | 33 +++ .../api/TaskCommunicatorContext.java | 16 +- .../dag/api/client/TestDAGClientHandler.java | 4 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 6 +- .../tez/dag/app/TestMockDAGAppMaster.java | 3 +- .../dag/app/TestTaskCommunicatorManager.java | 136 ++++++++++- .../apache/tez/dag/app/dag/impl/TestCommit.java | 87 +++++-- .../tez/dag/app/dag/impl/TestDAGImpl.java | 84 +++++-- .../tez/dag/app/dag/impl/TestVertexImpl.java | 13 +- .../launcher/TestContainerLauncherManager.java | 101 +++++++- .../tez/dag/app/rm/TestTaskScheduler.java | 19 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 15 +- .../dag/app/rm/TestTaskSchedulerManager.java | 161 ++++++++++++ .../tez/dag/helpers/DagInfoImplForTest.java | 38 +++ .../tez/dag/app/ErrorPluginConfiguration.java | 134 ++++++++++ ...zTestServiceContainerLauncherWithErrors.java | 17 +- ...stServiceTaskSchedulerServiceWithErrors.java | 23 +- ...ezTestServiceTaskCommunicatorWithErrors.java | 22 +- .../tests/TestExternalTezServicesErrors.java | 243 +++++++++++++++---- 46 files changed, 1584 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e2f77f6..af643dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log Release 0.8.3: Unreleased INCOMPATIBLE CHANGES + TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: TEZ-3103. Shuffle can hang when memory to memory merging enabled http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java index 70a3498..ed1d58f 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java @@ -14,15 +14,15 @@ package org.apache.tez.serviceplugins.api; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.dag.api.UserPayload; @InterfaceAudience.Public @InterfaceStability.Unstable -public interface ContainerLauncherContext { +public interface ContainerLauncherContext extends ServicePluginContextBase { // TODO TEZ-2003 (post) TEZ-2664 Tez abstraction for ContainerId, NodeId, other YARN constructs @@ -77,13 +77,6 @@ public interface ContainerLauncherContext { // Lookup APIs /** - * Get the UserPayload that was configured while setting up the launcher - * - * @return the initially configured user payload - */ - UserPayload getInitialUserPayload(); - - /** * Get the number of nodes being handled by the specified source * * @param sourceName the relevant source name @@ -108,4 +101,5 @@ public interface ContainerLauncherContext { * */ Object getTaskCommunicatorMetaInfo(String taskCommName); + } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java new file mode 100644 index 0000000..ef73343 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +public interface DagInfo { + + /** + * The index of the current dag + * @return a numerical identifier for the DAG. This is unique within the currently running application. + */ + int getIndex(); + + /** + * Get the name of the dag + * @return the name of the dag + */ + String getName(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java new file mode 100644 index 0000000..90a51b2 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.tez.dag.api.UserPayload; + +/** + * Base interface for ServicePluginContexts + */ +public interface ServicePluginContextBase { + + /** + * Get the UserPayload that was configured while setting up the launcher + * + * @return the initially configured user payload + */ + UserPayload getInitialUserPayload(); + + /** + * Get information on the currently executing dag + * @return info on the currently running dag, or null if no dag is executing + */ + @Nullable + DagInfo getCurrentDagInfo(); + + /** + * Report an error from the service. This results in the specific DAG being killed. + * + * @param servicePluginError the error category + * @param message A diagnostic message associated with this error + * @param dagInfo the affected dag + */ + void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo); +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java new file mode 100644 index 0000000..932c0fa --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +/** + * Represents errors from a ServicePlugin. The default implementation {@link ServicePluginErrorDefaults} + * lists a basic set of errors. + * This can be extended by implementing this interface, if the default set is not adequate + */ +public interface ServicePluginError { + + enum ErrorType { + TEMPORARY, PERMANENT, + } + + /** + * Get the enum representation + * + * @return an enum representation of the ServicePluginError + */ + Enum getEnum(); + + /** + * The type of the error + * + * @return the type of the error + */ + ErrorType getErrorType(); + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java new file mode 100644 index 0000000..83a85b5 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api;/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A default set of errors from ServicePlugins + * + * Errors are marked as fatal or non-fatal for the Application. + * Fatal errors cause the AM to go down. + * + */ +@InterfaceAudience.Public +public enum ServicePluginErrorDefaults implements ServicePluginError { + /** + * Indicates that the service is currently unavailable. + * This is a temporary error. + */ + SERVICE_UNAVAILABLE(ErrorType.TEMPORARY), + + /** Indicates that the service is in an inconsistent state. + * This is a fatal error. + */ + INCONSISTENT_STATE(ErrorType.PERMANENT), + + /** + * Other temporary error, + */ + OTHER(ErrorType.TEMPORARY), + + /** + * Other fatal error. + */ + OTHER_FATAL(ErrorType.PERMANENT); + + private ErrorType errorType; + + ServicePluginErrorDefaults(ErrorType errorType) { + this.errorType = errorType; + } + + @Override + public Enum getEnum() { + return this; + } + + @Override + public ErrorType getErrorType() { + return errorType; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java index a24061f..d30ada3 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; -import org.apache.tez.dag.api.UserPayload; /** * Context for a {@link TaskScheduler} @@ -42,7 +41,7 @@ import org.apache.tez.dag.api.UserPayload; */ @InterfaceAudience.Public @InterfaceStability.Unstable -public interface TaskSchedulerContext { +public interface TaskSchedulerContext extends ServicePluginContextBase { class AppFinalStatus { public final FinalApplicationStatus exitStatus; @@ -136,14 +135,6 @@ public interface TaskSchedulerContext { ); /** - * Indicate to the framework that the scheduler has run into an error. This will cause - * the DAG and application to be killed. - * - * @param t the relevant error - */ - void onError(Throwable t); - - /** * Inform the framework that the scheduler has determined that a previously allocated container * needs to be preempted * @@ -164,13 +155,6 @@ public interface TaskSchedulerContext { // Getters /** - * Get the UserPayload that was configured while setting up the scheduler - * - * @return the initially configured user payload - */ - UserPayload getInitialUserPayload(); - - /** * Get the tracking URL for the application. Primarily relevant to YARN * * @return the trackingUrl for the app @@ -234,4 +218,5 @@ public interface TaskSchedulerContext { * @return the app master state */ AMState getAMState(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/Utils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java index 959b536..6f03a67 100644 --- a/tez-dag/src/main/java/org/apache/tez/Utils.java +++ b/tez-dag/src/main/java/org/apache/tez/Utils.java @@ -15,7 +15,14 @@ package org.apache.tez; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.event.Event; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.DAGTerminationCause; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,4 +70,30 @@ public class Utils { return "[" + schedulerIndex + ":" + name + "]"; } + public static void processNonFatalServiceErrorReport(String entityString, + ServicePluginError servicePluginError, + String diagnostics, + DagInfo dagInfo, AppContext appContext, + String componentName) { + String message = "Error reported by " + componentName + " [" + + entityString + "][" + + servicePluginError + + "] " + (diagnostics == null ? "" : diagnostics); + if (dagInfo != null) { + DAG dag = appContext.getCurrentDAG(); + if (dag != null && dag.getID().getId() == dagInfo.getIndex()) { + TezDAGID dagId = dag.getID(); + // Send a kill message only if it is the same dag. + LOG.warn(message + ", Failing dag: [" + dagInfo.getName() + ", " + dagId + "]"); + sendEvent(appContext, new DAGEventTerminateDag(dagId, DAGTerminationCause.SERVICE_PLUGIN_ERROR, message)); + } + } else { + LOG.warn("No current dag name provided. Not acting on " + message); + } + } + + @SuppressWarnings("unchecked") + private static void sendEvent(AppContext appContext, Event event) { + appContext.getEventHandler().handle(event); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 0f674f3..79b9acd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -112,7 +112,7 @@ public class DAGClientHandler { public void tryKillDAG(String dagIdStr) throws TezException { DAG dag = getDAG(dagIdStr); LOG.info("Sending client kill to dag: " + dagIdStr); - dagAppMaster.tryKillDAG(dag); + dagAppMaster.tryKillDAG(dag, "Kill Dag request received from client"); } public synchronized String submitDAG(DAGPlan dagPlan, @@ -120,10 +120,11 @@ public class DAGClientHandler { return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources); } + // Only to be invoked by the DAGClient. public synchronized void shutdownAM() throws TezException { LOG.info("Received message to shutdown AM"); if (dagAppMaster != null) { - dagAppMaster.shutdownTezAM(); + dagAppMaster.shutdownTezAM("AM Shutdown request received from client"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java index 9434256..7e68675 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java @@ -14,6 +14,8 @@ package org.apache.tez.dag.app; +import javax.annotation.Nullable; + import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -21,7 +23,10 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.app.launcher.ContainerLauncherManager; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.rm.container.AMContainerEvent; import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; @@ -39,15 +44,22 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class); private final AppContext context; + private final ContainerLauncherManager containerLauncherManager; private final TaskCommunicatorManagerInterface tal; private final UserPayload initialUserPayload; + private final int containerLauncherIndex; - public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) { + public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherManager containerLauncherManager, + TaskCommunicatorManagerInterface tal, + UserPayload initialUserPayload, int containerLauncherIndex) { Preconditions.checkNotNull(appContext, "AppContext cannot be null"); + Preconditions.checkNotNull(appContext, "ContainerLauncherManager cannot be null"); Preconditions.checkNotNull(tal, "TaskCommunicator cannot be null"); this.context = appContext; + this.containerLauncherManager = containerLauncherManager; this.tal = tal; this.initialUserPayload = initialUserPayload; + this.containerLauncherIndex = containerLauncherIndex; } @Override @@ -103,6 +115,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { return context.getApplicationAttemptId(); } + @Nullable + @Override + public DagInfo getCurrentDagInfo() { + return context.getCurrentDAG(); + } + @Override public Object getTaskCommunicatorMetaInfo(String taskCommName) { int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName); @@ -120,4 +138,11 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext { return null; } + @Override + public void reportError(ServicePluginError servicePluginError, String message, DagInfo dagInfo) { + Preconditions.checkNotNull(servicePluginError, "ServiceError must be specified"); + containerLauncherManager.reportError(containerLauncherIndex, servicePluginError, message, dagInfo); + } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 579d23f..5ac3800 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -72,9 +72,11 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; +import org.apache.tez.dag.app.dag.DAGTerminationCause; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.app.dag.event.DAGEventInternalError; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -711,8 +713,8 @@ public class DAGAppMaster extends AbstractService { state = DAGAppMasterState.ERROR; errDiagnostics = "Error in the TaskScheduler. Shutting down. "; addDiagnostic(errDiagnostics - + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable())); - LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable()); + + "Error=" + schedulingServiceErrorEvent.getDiagnosticInfo()); + LOG.error(errDiagnostics); shutdownHandler.shutdown(); break; case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR: @@ -724,7 +726,7 @@ public class DAGAppMaster extends AbstractService { Throwable error = usfe.getError(); errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo() + ", eventType=" + event.getType() - + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError()); + + ", exception=" + (usfe.getError() == null ? "None" : ExceptionUtils.getStackTrace(usfe.getError())); LOG.error(errDiagnostics, error); addDiagnostic(errDiagnostics); @@ -1291,16 +1293,16 @@ public class DAGAppMaster extends AbstractService { + oldState + " new state: " + state); } - public void shutdownTezAM() throws TezException { + public void shutdownTezAM(String dagKillmessage) throws TezException { sessionStopped.set(true); synchronized (this) { this.taskSchedulerManager.setShouldUnregisterFlag(); if (currentDAG != null && !currentDAG.isComplete()) { - //send a DAG_KILL message + //send a DAG_TERMINATE message LOG.info("Sending a kill event to the current DAG" + ", dagId=" + currentDAG.getID()); - tryKillDAG(currentDAG); + tryKillDAG(currentDAG, dagKillmessage); } else { LOG.info("No current running DAG, shutting down the AM"); if (isSession && !state.equals(DAGAppMasterState.ERROR)) { @@ -1376,13 +1378,13 @@ public class DAGAppMaster extends AbstractService { } @SuppressWarnings("unchecked") - public void tryKillDAG(DAG dag) throws TezException { + public void tryKillDAG(DAG dag, String message) throws TezException { try { logDAGKillRequestEvent(dag.getID(), false); } catch (IOException e) { throw new TezException(e); } - dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message)); } private Map getAdditionalLocalResourceDiff( @@ -2235,10 +2237,10 @@ public class DAGAppMaster extends AbstractService { if (currentTime < (lastDAGCompletionTime + sessionTimeoutInterval)) { return; } - LOG.info("Session timed out" + String message = "Session timed out" + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms" - + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms"); - shutdownTezAM(); + + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms"; + shutdownTezAM(message); } public boolean isSession() { http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 7f88be2..a922f38 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.app; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; @@ -28,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.rm.container.AMContainer; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest; @@ -143,6 +146,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver this); } + @SuppressWarnings("deprecation") @Override public String getCurrentDagName() { return getDag().getName(); @@ -153,11 +157,18 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver return context.getApplicationID().toString(); } + @SuppressWarnings("deprecation") @Override public int getCurrentDagIdenitifer() { return getDag().getID().getId(); } + @Nullable + @Override + public DagInfo getCurrentDagInfo() { + return getDag(); + } + @Override public Iterable getInputVertexNames(String vertexName) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); @@ -203,6 +214,12 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override + public void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo) { + Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be set"); + taskCommunicatorManager.reportError(taskCommunicatorIndex, servicePluginError, message, dagInfo); + } + + @Override public void onStateUpdated(VertexStateUpdate event) { taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex); } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index a196114..403e1a1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -33,6 +33,8 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.yarn.event.Event; import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.UserPayload; @@ -593,6 +595,30 @@ public class TaskCommunicatorManager extends AbstractService implements return taskCommunicators[taskCommIndex]; } + @Override + public void reportError(int taskCommIndex, ServicePluginError servicePluginError, + String diagnostics, + DagInfo dagInfo) { + if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) { + String msg = "Fatal Error reported by TaskCommunicator" + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context) + + ", servicePluginError=" + servicePluginError + + ", diagnostics= " + (diagnostics == null ? "" : diagnostics); + LOG.error(msg + ", Diagnostics=" + diagnostics); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, null)); + } else { + Utils + .processNonFatalServiceErrorReport( + Utils.getTaskCommIdentifierString(taskCommIndex, context), servicePluginError, + diagnostics, + dagInfo, context, + "TaskCommunicator"); + } + } + private void pingContainerHeartbeatHandler(ContainerId containerId) { containerHeartbeatHandler.pinged(containerId); } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java index e07b1a0..e0f9852 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java @@ -20,6 +20,8 @@ package org.apache.tez.dag.app; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.container.AMContainerTask; @@ -42,4 +44,6 @@ public interface TaskCommunicatorManagerInterface { void dagSubmitted(); TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex); + + void reportError(int taskCommIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagName); } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index a01c623..dd96ab2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -36,11 +36,12 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.serviceplugins.api.DagInfo; /** * Main interface to interact with the job. */ -public interface DAG { +public interface DAG extends DagInfo { TezDAGID getID(); Map getLocalResources(); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java index b6be395..b73cbe6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java @@ -26,6 +26,9 @@ public enum DAGTerminationCause { /** DAG was directly killed. */ DAG_KILL(DAGState.KILLED), + + /** A service plugin indicated an error */ + SERVICE_PLUGIN_ERROR(DAGState.FAILED), /** A vertex failed. */ VERTEX_FAILURE(DAGState.FAILED), http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java index 816f85a..49be74d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java @@ -24,7 +24,7 @@ package org.apache.tez.dag.app.dag; public enum VertexTerminationCause { /** DAG was killed */ - DAG_KILL(VertexState.KILLED), + DAG_TERMINATED(VertexState.KILLED), /** Other vertex failed causing DAG to fail thus killing this vertex */ OTHER_VERTEX_FAILURE(VertexState.KILLED), http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java index 16625df..cf49d20 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java @@ -18,17 +18,18 @@ package org.apache.tez.dag.app.dag.event; -public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent { +public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent + implements DiagnosableEvent { - private final Throwable throwable; + private final String diagnostics; - public DAGAppMasterEventSchedulingServiceError(Throwable t) { + public DAGAppMasterEventSchedulingServiceError(String diagnostics) { super(DAGAppMasterEventType.SCHEDULING_SERVICE_ERROR); - this.throwable = t; + this.diagnostics = diagnostics; } - public Throwable getThrowable() { - return throwable; + @Override + public String getDiagnosticInfo() { + return diagnostics; } - } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java new file mode 100644 index 0000000..1286e11 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.app.dag.DAGTerminationCause; +import org.apache.tez.dag.records.TezDAGID; + +public class DAGEventTerminateDag extends DAGEvent implements DiagnosableEvent { + private final String diagMessage; + private final DAGTerminationCause terminationCause; + + public DAGEventTerminateDag(TezDAGID dagId, DAGTerminationCause terminationCause, String message) { + super(dagId, DAGEventType.DAG_TERMINATE); + this.diagMessage = message; + this.terminationCause = terminationCause; + } + + @Override + public String getDiagnosticInfo() { + return diagMessage; + } + + public DAGTerminationCause getTerminationCause() { + return terminationCause; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java index ea6a3cc..bf3b30a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java @@ -23,8 +23,8 @@ package org.apache.tez.dag.app.dag.event; */ public enum DAGEventType { - //Producer:Client - DAG_KILL, + //Producer: ServicePluginManagers , Client (KILL) + DAG_TERMINATE, //Producer:AM DAG_INIT, http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 88dfe27..a6c6c02 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -43,7 +43,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; -import org.apache.tez.dag.app.dag.event.DAGEventInternalError; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.app.dag.event.DiagnosableEvent; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; @@ -253,8 +253,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, EnumSet.of(DAGState.INITED, DAGState.FAILED), DAGEventType.DAG_INIT, new InitTransition()) - .addTransition(DAGState.NEW, DAGState.KILLED, - DAGEventType.DAG_KILL, + .addTransition(DAGState.NEW, EnumSet.of(DAGState.KILLED, DAGState.FAILED), + DAGEventType.DAG_TERMINATE, new KillNewJobTransition()) .addTransition(DAGState.NEW, DAGState.ERROR, DAGEventType.INTERNAL_ERROR, @@ -269,8 +269,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, .addTransition(DAGState.INITED, DAGState.RUNNING, DAGEventType.DAG_START, new StartTransition()) - .addTransition(DAGState.INITED, DAGState.KILLED, - DAGEventType.DAG_KILL, + .addTransition(DAGState.INITED, EnumSet.of(DAGState.KILLED, DAGState.FAILED), + DAGEventType.DAG_TERMINATE, new KillInitedJobTransition()) .addTransition(DAGState.INITED, DAGState.ERROR, DAGEventType.INTERNAL_ERROR, @@ -287,7 +287,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGEventType.DAG_VERTEX_RERUNNING, new VertexReRunningTransition()) .addTransition(DAGState.RUNNING, DAGState.TERMINATING, - DAGEventType.DAG_KILL, new DAGKilledTransition()) + DAGEventType.DAG_TERMINATE, new DAGKilledTransition()) .addTransition(DAGState.RUNNING, DAGState.RUNNING, DAGEventType.DAG_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) @@ -311,7 +311,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGEventType.DAG_COMMIT_COMPLETED, COMMIT_COMPLETED_TRANSITION) .addTransition(DAGState.COMMITTING, DAGState.TERMINATING, - DAGEventType.DAG_KILL, + DAGEventType.DAG_TERMINATE, new DAGKilledWhileCommittingTransition()) .addTransition( DAGState.COMMITTING, @@ -354,7 +354,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Ignore-able events .addTransition(DAGState.TERMINATING, DAGState.TERMINATING, - EnumSet.of(DAGEventType.DAG_KILL, + EnumSet.of(DAGEventType.DAG_TERMINATE, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE)) @@ -370,7 +370,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED, - EnumSet.of(DAGEventType.DAG_KILL, + EnumSet.of(DAGEventType.DAG_TERMINATE, DAGEventType.DAG_SCHEDULER_UPDATE, DAGEventType.DAG_VERTEX_COMPLETED)) @@ -386,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(DAGState.FAILED, DAGState.FAILED, - EnumSet.of(DAGEventType.DAG_KILL, + EnumSet.of(DAGEventType.DAG_TERMINATE, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, @@ -404,7 +404,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(DAGState.KILLED, DAGState.KILLED, - EnumSet.of(DAGEventType.DAG_KILL, + EnumSet.of(DAGEventType.DAG_TERMINATE, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_RERUNNING, DAGEventType.DAG_SCHEDULER_UPDATE, @@ -415,7 +415,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGState.ERROR, DAGState.ERROR, EnumSet.of( - DAGEventType.DAG_KILL, + DAGEventType.DAG_TERMINATE, DAGEventType.DAG_INIT, DAGEventType.DAG_START, DAGEventType.DAG_VERTEX_COMPLETED, @@ -1424,6 +1424,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public int getIndex() { + return dagId.getId(); + } + + @Override public String getName() { return dagName; } @@ -1836,28 +1841,41 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } + private void addDiagnostics(DiagnosableEvent event) { + if (event.getDiagnosticInfo() != null && !event.getDiagnosticInfo().isEmpty()) { + addDiagnostic(event.getDiagnosticInfo()); + } + } + // Task-start has been moved out of InitTransition, so this arc simply // hardcodes 0 for both map and reduce finished tasks. - private static class KillNewJobTransition - implements SingleArcTransition { + private static class KillNewJobTransition implements + MultipleArcTransition { @Override - public void transition(DAGImpl dag, DAGEvent dagEvent) { + public DAGState transition(DAGImpl dag, DAGEvent dagEvent) { + DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent; dag.setFinishTime(); - dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); - dag.finished(DAGState.KILLED); + dag.trySetTerminationCause(event.getTerminationCause()); + dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() + + "] in NEW state."); + dag.addDiagnostics(event); + return dag.finished(event.getTerminationCause().getFinishedState()); } } - private static class KillInitedJobTransition - implements SingleArcTransition { + private static class KillInitedJobTransition implements + MultipleArcTransition { @Override - public void transition(DAGImpl dag, DAGEvent dagEvent) { - dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); - dag.addDiagnostic("Job received Kill in INITED state."); - dag.finished(DAGState.KILLED); + public DAGState transition(DAGImpl dag, DAGEvent dagEvent) { + DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent; + dag.trySetTerminationCause(event.getTerminationCause()); + dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() + + "] in INITED state."); + dag.addDiagnostics(event); + return dag.finished(event.getTerminationCause().getFinishedState()); } } @@ -1865,11 +1883,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private static class DAGKilledTransition implements SingleArcTransition { @Override - public void transition(DAGImpl job, DAGEvent event) { - String msg = "Job received Kill while in RUNNING state."; + public void transition(DAGImpl job, DAGEvent dagEvent) { + DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent; + String msg = "Dag received [" + event.getType() + ", " + event.getTerminationCause() + + "] in RUNNING state."; LOG.info(msg); job.addDiagnostic(msg); - job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL); + job.addDiagnostics(event); + job.enactKill(event.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED); // Commit may happen when dag is still in RUNNING (vertex group commit) job.cancelCommits(); // TODO Metrics @@ -1883,12 +1904,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, implements SingleArcTransition { @Override - public void transition(DAGImpl dag, DAGEvent event) { - String diag = "DAG received Kill while in COMMITTING state."; + public void transition(DAGImpl dag, DAGEvent dagEvent) { + DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent; + String diag = "Dag received [" + event.getType() + ", " + event.getTerminationCause() + + "] in COMMITTING state."; LOG.info(diag); dag.addDiagnostic(diag); + dag.addDiagnostics(event); dag.cancelCommits(); - dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL); + dag.trySetTerminationCause(event.getTerminationCause()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 065974e..c8f217b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -3206,7 +3206,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexEventTermination vet = (VertexEventTermination) event; VertexTerminationCause trigger = vet.getTerminationCause(); switch(trigger){ - case DAG_KILL : vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break; + case DAG_TERMINATED: vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break; case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break; case ROOT_INPUT_INIT_FAILURE: case COMMIT_FAILURE: http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 98237c1..250afd8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -44,6 +44,8 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.ContainerLauncherEvent; import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,16 +60,9 @@ public class ContainerLauncherManager extends AbstractService final ContainerLauncherContext containerLauncherContexts[]; protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers; private final AppContext appContext; + private final boolean isIncompleteCtor; + - @VisibleForTesting - public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) { - super(ContainerLauncherManager.class.getName()); - this.appContext = context; - containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)}; - containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; - containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ - new ServicePluginLifecycleAbstractService<>(containerLauncher)}; - } // Accepting conf to setup final parameters, if required. public ContainerLauncherManager(AppContext context, @@ -77,6 +72,7 @@ public class ContainerLauncherManager extends AbstractService boolean isPureLocalMode) throws TezException { super(ContainerLauncherManager.class.getName()); + this.isIncompleteCtor = false; this.appContext = context; Preconditions.checkArgument( containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(), @@ -89,7 +85,7 @@ public class ContainerLauncherManager extends AbstractService for (int i = 0; i < containerLauncherDescriptors.size(); i++) { UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload(); ContainerLauncherContext containerLauncherContext = - new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload); + new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface, userPayload, i); containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context, containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode)); @@ -98,6 +94,25 @@ public class ContainerLauncherManager extends AbstractService } @VisibleForTesting + public ContainerLauncherManager(AppContext context) { + super(ContainerLauncherManager.class.getName()); + this.isIncompleteCtor = true; + this.appContext = context; + containerLaunchers = new ContainerLauncherWrapper[1]; + containerLauncherContexts = new ContainerLauncherContext[1]; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[1]; + } + + // To be used with the constructor which accepts the AppContext only, and is for testing. + @VisibleForTesting + public void setContainerLauncher(ContainerLauncher containerLauncher) { + Preconditions.checkState(isIncompleteCtor == true, "Can only be used with the Test constructor"); + containerLaunchers[0] = new ContainerLauncherWrapper(containerLauncher); + containerLauncherContexts[0] = containerLauncher.getContext(); + containerLauncherServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(containerLauncher); + } + + @VisibleForTesting ContainerLauncher createContainerLauncher( NamedEntityDescriptor containerLauncherDescriptor, AppContext context, @@ -236,6 +251,30 @@ public class ContainerLauncherManager extends AbstractService } } + public void reportError(int containerLauncherIndex, ServicePluginError servicePluginError, + String diagnostics, + DagInfo dagInfo) { + if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) { + String msg = "Fatal Error reported by ContainerLauncher" + + ", containerLauncher=" + + Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext) + + ", servicePluginError=" + servicePluginError + + ", diagnostics= " + (diagnostics == null ? "" : diagnostics); + LOG.error(msg); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, + msg, null)); + } else { + Utils + .processNonFatalServiceErrorReport( + Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext), + servicePluginError, + diagnostics, dagInfo, + appContext, "ContainerLauncher"); + } + } + @SuppressWarnings("unchecked") private void sendEvent(Event event) { appContext.getEventHandler().handle(event); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java index 37aa96b..fb4198b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -14,10 +14,12 @@ package org.apache.tez.dag.app.rm; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -29,6 +31,8 @@ import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; public class TaskSchedulerContextImpl implements TaskSchedulerContext { @@ -94,11 +98,6 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { } @Override - public void onError(Throwable t) { - taskSchedulerManager.onError(schedulerId, t); - } - - @Override public float getProgress() { return taskSchedulerManager.getProgress(schedulerId); } @@ -139,6 +138,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { return appContext.getApplicationAttemptId(); } + @Nullable + @Override + public DagInfo getCurrentDagInfo() { + return appContext.getCurrentDAG(); + } + @Override public String getAppHostName() { return appHostName; @@ -175,4 +180,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext { throw new TezUncheckedException("Unexpected state " + appContext.getAMState()); } } + + @Override + public void reportError(ServicePluginError servicePluginError, String diagnostics, + DagInfo dagInfo) { + Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be specified"); + taskSchedulerManager.reportError(schedulerId, servicePluginError, diagnostics, dagInfo); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java index 9e4c8e0..7e1988b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.rm; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -37,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; /** @@ -97,8 +101,9 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { } @Override - public void onError(Throwable t) { - executorService.submit(new OnErrorCallable(real, t)); + public void reportError(@Nonnull ServicePluginError servicePluginError, String message, + DagInfo dagInfo) { + executorService.submit(new ReportErrorCallable(real, servicePluginError, message, dagInfo)); } @Override @@ -156,6 +161,12 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { return real.getApplicationAttemptId(); } + @Nullable + @Override + public DagInfo getCurrentDagInfo() { + return real.getCurrentDagInfo(); + } + @Override public String getAppHostName() { return real.getAppHostName(); @@ -175,6 +186,7 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { public AMState getAMState() { return real.getAMState(); } + // End of getters which do not need to go through a thread. Underlying implementation // does not use locks. @@ -301,19 +313,24 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { } } - static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements - Callable { + static class ReportErrorCallable extends TaskSchedulerContextCallbackBase implements Callable { - private final Throwable throwable; + private final ServicePluginError servicePluginError; + private final String message; + private final DagInfo dagInfo; - public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) { + public ReportErrorCallable(TaskSchedulerContext app, + ServicePluginError servicePluginError, String message, + DagInfo dagInfo) { super(app); - this.throwable = throwable; + this.servicePluginError = servicePluginError; + this.message = message; + this.dagInfo = dagInfo; } @Override public Void call() throws Exception { - app.onError(throwable); + app.reportError(servicePluginError, message, dagInfo); return null; } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index fa9fb81..5317440 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -38,6 +38,8 @@ import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.serviceplugins.api.DagInfo; +import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; @@ -844,9 +846,36 @@ public class TaskSchedulerManager extends AbstractService implements return dagAppMaster.getProgress(); } - public void onError(int schedulerId, Throwable t) { - LOG.info("Error reported by scheduler {} - {}", schedulerId, t); - sendEvent(new DAGAppMasterEventSchedulingServiceError(t)); + public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError, + String diagnostics, + DagInfo dagInfo) { + if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) { + LOG.info("Error reported by scheduler {} - {}", + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " + + diagnostics); + if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName() + .equals(YarnTaskSchedulerService.class.getName())) { + LOG.warn( + "Reporting a SchedulerServiceError to the DAGAppMaster since the error" + + " was reported by the default YARN Task Scheduler"); + sendEvent(new DAGAppMasterEventSchedulingServiceError(diagnostics)); + } + } else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) { + String msg = "Fatal error reported by TaskScheduler" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + + ", servicePluginError=" + servicePluginError + + ", diagnostics= " + (diagnostics == null ? "" : diagnostics); + LOG.error(msg); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, null)); + } else { + Utils.processNonFatalServiceErrorReport( + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext), + servicePluginError, diagnostics, dagInfo, + appContext, "TaskScheduler"); + } } public void dagCompleted() { @@ -964,5 +993,4 @@ public class TaskSchedulerManager extends AbstractService implements return historyUrl; } - } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 1f05064..c1c363b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -35,7 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -916,7 +916,9 @@ public class YarnTaskSchedulerService extends TaskScheduler LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t)); return; } - getContext().onError(t); + LOG.error("Got Error from RMClient", t); + getContext().reportError(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR, StringUtils.stringifyException(t), + null); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java new file mode 100644 index 0000000..e8017dd --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.tez.serviceplugins.api.ServicePluginError; + +public enum YarnTaskSchedulerServiceError implements ServicePluginError { + + RESOURCEMANAGER_ERROR; + + @Override + public Enum getEnum() { + return this; + } + + @Override + public ErrorType getErrorType() { + return ErrorType.PERMANENT; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java index c55bdbd..c551b09 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java @@ -36,7 +36,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -44,7 +43,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; // Do not make calls into this from within a held lock. // TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module -public interface TaskCommunicatorContext { +public interface TaskCommunicatorContext extends ServicePluginContextBase { // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API // - Consolidate usage of IDs @@ -57,12 +56,6 @@ public interface TaskCommunicatorContext { // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification) - /** - * Get the UserPayload that was configured while setting up the task communicator - * - * @return the initially configured user payload - */ - UserPayload getInitialUserPayload(); /** * Get the application attempt id for the running application. Relevant when running under YARN @@ -170,11 +163,14 @@ public interface TaskCommunicatorContext { */ void registerForVertexStateUpdates(String vertexName, @Nullable Set stateSet); + // TODO TEZ-3120 Remove deprecated methods /** * Get the name of the currently executing dag * * @return the name of the currently executing dag + * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo} */ + @Deprecated String getCurrentDagName(); /** @@ -183,10 +179,13 @@ public interface TaskCommunicatorContext { */ String getCurrentAppIdentifier(); + // TODO TEZ-3120 Remove deprecated methods /** * Get the identifier for the currently executing dag. * @return a numerical identifier for the currently running DAG. This is unique within the currently running application. + * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo} */ + @Deprecated int getCurrentDagIdenitifer(); /** @@ -237,4 +236,5 @@ public interface TaskCommunicatorContext { * @return time when the current dag started executing */ long getDagStartTime(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java index 80414ba..23a5191 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java @@ -112,7 +112,7 @@ public class TestDAGClientHandler { } dagClientHandler.tryKillDAG("dag_9999_0001_1"); ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(DAG.class); - verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture()); + verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture(), eq("Kill Dag request received from client")); assertEquals(1, eventCaptor.getAllValues().size()); assertTrue(eventCaptor.getAllValues().get(0) instanceof DAG); assertEquals("dag_9999_0001_1", ((DAG)eventCaptor.getAllValues().get(0)).getID().toString()); @@ -125,7 +125,7 @@ public class TestDAGClientHandler { // shutdown dagClientHandler.shutdownAM(); - verify(mockDagAM).shutdownTezAM(); + verify(mockDagAM).shutdownTezAM(eq("AM Shutdown request received from client")); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 08f81fb..b021a36 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -520,10 +520,12 @@ public class MockDAGAppMaster extends DAGAppMaster { } catch (IOException e) { throw new TezUncheckedException(e); } + ContainerLauncherManager clManager = new ContainerLauncherManager(getContext()); ContainerLauncherContext containerLauncherContext = - new ContainerLauncherContextImpl(getContext(), getTaskCommunicatorManager(), userPayload); + new ContainerLauncherContextImpl(getContext(), clManager, getTaskCommunicatorManager(), userPayload, 0); containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext); - return new ContainerLauncherManager(containerLauncher, getContext()); + clManager.setContainerLauncher(containerLauncher); + return clManager; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index d5ee67d..74ac51e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -854,7 +854,8 @@ public class TestMockDAGAppMaster { tezClient.submitDAG(dag); mockLauncher.waitTillContainersLaunched(); - mockApp.handle(new DAGAppMasterEventSchedulingServiceError(new RuntimeException("Mock error"))); + mockApp.handle(new DAGAppMasterEventSchedulingServiceError( + org.apache.hadoop.util.StringUtils.stringifyException(new RuntimeException("Mock error")))); while(!mockApp.getShutdownHandler().wasShutdownInvoked()) { Thread.sleep(100); http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index 5323928..c7f97d3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -23,11 +23,13 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; @@ -49,6 +52,11 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; +import org.apache.tez.dag.helpers.DagInfoImplForTest; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConstants; @@ -62,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -73,7 +82,7 @@ public class TestTaskCommunicatorManager { @Before @After - public void reset() { + public void resetForNextTest() { TaskCommManagerForMultipleCommTest.reset(); } @@ -233,6 +242,71 @@ public class TestTaskCommunicatorManager { @SuppressWarnings("unchecked") @Test(timeout = 5000) + public void testReportFailureFromTaskCommunicator() throws TezException { + String dagName = DAG_NAME; + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + doReturn("testTaskCommunicator").when(appContext).getTaskCommunicatorName(0); + doReturn(eventHandler).when(appContext).getEventHandler(); + + DAG dag = mock(DAG.class); + TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX); + doReturn(dagName).when(dag).getName(); + doReturn(dagId).when(dag).getID(); + doReturn(dag).when(appContext).getCurrentDAG(); + + NamedEntityDescriptor namedEntityDescriptor = + new NamedEntityDescriptor<>("testTaskCommunicator", TaskCommForFailureTest.class.getName()); + List list = new LinkedList<>(); + list.add(namedEntityDescriptor); + + + TaskCommunicatorManager taskCommManager = + new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class), + mock(ContainerHeartbeatHandler.class), list); + try { + taskCommManager.init(new Configuration()); + taskCommManager.start(); + + taskCommManager.registerRunningContainer(mock(ContainerId.class), 0); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGEventTerminateDag); + DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent; + assertTrue(killEvent.getDiagnosticInfo().contains("ReportError")); + assertTrue(killEvent.getDiagnosticInfo() + .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())); + assertTrue(killEvent.getDiagnosticInfo().contains("[0:testTaskCommunicator]")); + + + reset(eventHandler); + + taskCommManager.dagComplete(dag); + + argumentCaptor = ArgumentCaptor.forClass(Event.class); + + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + rawEvent = argumentCaptor.getValue(); + + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError")); + assertTrue( + event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name())); + assertTrue(event.getDiagnosticInfo().contains("[0:testTaskCommunicator]")); + + } finally { + taskCommManager.stop(); + } + + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testTaskCommunicatorUserError() { TaskCommunicatorContextImpl taskCommContext = mock(TaskCommunicatorContextImpl.class); TaskCommunicator taskCommunicator = mock(TaskCommunicator.class, new ExceptionAnswer()); @@ -313,7 +387,6 @@ public class TestTaskCommunicatorManager { } } - static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager { // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor, @@ -460,4 +533,63 @@ public class TestTaskCommunicatorManager { return null; } } + + private static final String DAG_NAME = "dagName"; + private static final int DAG_INDEX = 1; + public static class TaskCommForFailureTest extends TaskCommunicator { + + public TaskCommForFailureTest( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + } + + @Override + public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws + ServicePluginException { + getContext() + .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME)); + } + + @Override + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, + @Nullable String diagnostics) throws ServicePluginException { + + } + + @Override + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map additionalResources, + Credentials credentials, boolean credentialsChanged, + int priority) throws ServicePluginException { + + } + + @Override + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) throws + ServicePluginException { + + } + + @Override + public InetSocketAddress getAddress() throws ServicePluginException { + return null; + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException { + + } + + @Override + public void dagComplete(int dagIdentifier) throws ServicePluginException { + getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null); + } + + @Override + public Object getMetaInfo() throws ServicePluginException { + return null; + } + } }