Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0472E17ACA for ; Fri, 24 Apr 2015 00:25:56 +0000 (UTC) Received: (qmail 35350 invoked by uid 500); 24 Apr 2015 00:25:55 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 35231 invoked by uid 500); 24 Apr 2015 00:25:55 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 34903 invoked by uid 99); 24 Apr 2015 00:25:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Apr 2015 00:25:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 635A1E1800; Fri, 24 Apr 2015 00:25:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 24 Apr 2015 00:26:03 -0000 Message-Id: <521e12c0daed418aa028e1038a8d137c@git.apache.org> In-Reply-To: <3ae8c698c8aa44f5b905b43cdcaac9f0@git.apache.org> References: <3ae8c698c8aa44f5b905b43cdcaac9f0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] tez git commit: TEZ-714. OutputCommitters should not run in the main AM dispatcher thread (zjffdu) http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java new file mode 100644 index 0000000..0df8a4f --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -0,0 +1,1682 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.app.dag.impl; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.security.ACLManager; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.TaskAttemptListener; +import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.DAGTerminationCause; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.VertexTerminationCause; +import org.apache.tez.dag.app.dag.event.CallableEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventStartDag; +import org.apache.tez.dag.app.dag.event.DAGEventType; +import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning; +import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; +import org.apache.tez.dag.app.dag.event.TaskEvent; +import org.apache.tez.dag.app.dag.event.TaskEventType; +import org.apache.tez.dag.app.dag.event.VertexEvent; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; +import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; +import org.apache.tez.dag.app.dag.event.VertexEventType; +import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.OutputCommitter; +import org.apache.tez.runtime.api.OutputCommitterContext; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * + * The test case of commit here are different from that in TestDAGImpl & + * TestVertexImpl in that the commits here are running in separated thread. So + * should need to pay some special attention. + * + * 2 kinds of commit + *
  • test XXX_OnDAGSuccess means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true + *
  • test XXX_OnVertexSuccess means TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false + * + */ +public class TestCommit { + + private static final Log LOG = LogFactory.getLog(TestCommit.class); + private TezDAGID dagId; + private static Configuration conf = new Configuration(); + private DrainDispatcher dispatcher; + private Credentials fsTokens; + private AppContext appContext; + private ACLManager aclManager; + private ApplicationAttemptId appAttemptId; + private DAGImpl dag; + private TaskEventDispatcher taskEventDispatcher; + private VertexEventDispatcher vertexEventDispatcher; + private DagEventDispatcher dagEventDispatcher; + private TaskAttemptListener taskAttemptListener; + private TaskHeartbeatHandler thh; + private Clock clock = new SystemClock(); + private DAGFinishEventHandler dagFinishEventHandler; + private HistoryEventHandler historyEventHandler; + private TaskAttemptEventDispatcher taskAttemptEventDispatcher; + + private ExecutorService rawExecutor; + private ListeningExecutorService execService; + + private class DagEventDispatcher implements EventHandler { + @Override + public void handle(DAGEvent event) { + dag.handle(event); + } + } + + private class VertexEventDispatcher implements EventHandler { + + @SuppressWarnings("unchecked") + @Override + public void handle(VertexEvent event) { + Vertex vertex = dag.getVertex(event.getVertexId()); + ((EventHandler) vertex).handle(event); + } + } + + private class TaskEventDispatcher implements EventHandler { + @SuppressWarnings("unchecked") + @Override + public void handle(TaskEvent event) { + Vertex vertex = dag.getVertex(event.getTaskID().getVertexID()); + Task task = vertex.getTask(event.getTaskID()); + ((EventHandler) task).handle(event); + } + } + + private class TaskAttemptEventDispatcher implements + EventHandler { + @Override + public void handle(TaskAttemptEvent event) { + // Ignore + } + } + + private class DAGFinishEventHandler implements + EventHandler { + @Override + public void handle(DAGAppMasterEventDAGFinished event) { + } + } + + public static class CountingOutputCommitter extends OutputCommitter { + + public volatile int initCounter = 0; + public volatile int setupCounter = 0; + public volatile int commitCounter = 0; + public volatile int abortCounter = 0; + private boolean throwError; + private volatile boolean blockCommit; + + public CountingOutputCommitter(OutputCommitterContext context) { + super(context); + this.throwError = false; + } + + @Override + public void initialize() throws IOException { + if (getContext().getUserPayload() != null + && getContext().getUserPayload().hasPayload()) { + CountingOutputCommitterConfig conf = new CountingOutputCommitterConfig( + getContext().getUserPayload()); + this.throwError = conf.throwError; + this.blockCommit = conf.blockCommit; + } + ++initCounter; + } + + @Override + public void setupOutput() throws IOException { + ++setupCounter; + } + + @Override + public void commitOutput() throws IOException { + ++commitCounter; + while (blockCommit) { + try { + Thread.sleep(100); + LOG.info("committing output:" + getContext().getOutputName()); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + if (throwError) { + throw new RuntimeException("I can throwz exceptions in commit"); + } + } + + public void unblockCommit() { + blockCommit = false; + } + + @Override + public void abortOutput(VertexStatus.State finalState) throws IOException { + ++abortCounter; + } + + public static class CountingOutputCommitterConfig implements Writable { + + boolean throwError = false; + boolean blockCommit = false; + + public CountingOutputCommitterConfig() { + } + + public CountingOutputCommitterConfig(boolean throwError, + boolean blockCommit) { + this.throwError = throwError; + this.blockCommit = blockCommit; + } + + public CountingOutputCommitterConfig(UserPayload payload) + throws IOException { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(payload.getPayload()); + this.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(throwError); + out.writeBoolean(blockCommit); + } + + @Override + public void readFields(DataInput in) throws IOException { + throwError = in.readBoolean(); + blockCommit = in.readBoolean(); + } + + public byte[] toUserPayload() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bos); + write(out); + return bos.toByteArray(); + } + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void setupDAG(DAGPlan dagPlan) { + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); + appAttemptId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(100, 1), 1); + dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1); + Assert.assertNotNull(dagId); + dispatcher = new DrainDispatcher(); + fsTokens = new Credentials(); + appContext = mock(AppContext.class); + rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build()); + execService = MoreExecutors.listeningDecorator(rawExecutor); + + doReturn(execService).when(appContext).getExecService(); + historyEventHandler = mock(HistoryEventHandler.class); + aclManager = new ACLManager("amUser"); + doReturn(conf).when(appContext).getAMConf(); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); + doReturn(appAttemptId.getApplicationId()).when(appContext) + .getApplicationID(); + doReturn(dagId).when(appContext).getCurrentDAGID(); + doReturn(historyEventHandler).when(appContext).getHistoryHandler(); + doReturn(aclManager).when(appContext).getAMACLManager(); + dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(), + taskAttemptListener, fsTokens, clock, "user", thh, appContext); + doReturn(dag).when(appContext).getCurrentDAG(); + doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler(); + + dispatcher.register(CallableEventType.class, new CallableEventDispatcher()); + taskEventDispatcher = new TaskEventDispatcher(); + dispatcher.register(TaskEventType.class, taskEventDispatcher); + taskAttemptEventDispatcher = new TaskAttemptEventDispatcher(); + dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher); + vertexEventDispatcher = new VertexEventDispatcher(); + dispatcher.register(VertexEventType.class, vertexEventDispatcher); + dagEventDispatcher = new DagEventDispatcher(); + dispatcher.register(DAGEventType.class, dagEventDispatcher); + dagFinishEventHandler = new DAGFinishEventHandler(); + dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler); + dispatcher.init(conf); + dispatcher.start(); + } + + @After + public void teardown() { + if (dispatcher != null) { + dispatcher.await(); + dispatcher.stop(); + } + if (execService != null) { + execService.shutdownNow(); + } + } + + private void waitUntil(DAGImpl dag, DAGState state) { + while (dag.getState() != state) { + LOG.info("Wait for dag go to state:" + state); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void waitUntil(VertexImpl vertex, VertexState state) { + while (vertex.getState() != state) { + LOG.info("Wait for vertex " + vertex.getLogIdentifier() + " go to state:" + + state); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private void waitForCommitCompleted(VertexImpl vertex, String outputName) { + while (vertex.commitFutures.containsKey(outputName)) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LOG.info("Wait for vertex commit " + outputName + " to complete"); + } + } + + private void waitForCommitCompleted(DAGImpl vertex, OutputKey outputKey) { + while (vertex.commitFutures.containsKey(outputKey)) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LOG.info("Wait for dag commit " + outputKey + " to complete"); + } + } + + // v1->v3 + // v2->v3 + // vertex_group (v1, v2) + private DAGPlan createDAGPlan(boolean vertexGroupCommitSucceeded, + boolean v3CommitSucceeded) throws Exception { + LOG.info("Setting up group dag plan"); + int dummyTaskCount = 1; + Resource dummyTaskResource = Resource.newInstance(1, 1); + org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create( + "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create( + "vertex2", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create( + "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + + DAG dag = DAG.create("testDag"); + String groupName1 = "uv12"; + OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !vertexGroupCommitSucceeded, true).toUserPayload()))); + OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !v3CommitSucceeded, true).toUserPayload()))); + + org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, + v1, v2); + OutputDescriptor outDesc = OutputDescriptor.create("output.class"); + uv12.addDataSink("v12Out", DataSinkDescriptor.create(outDesc, ocd1, null)); + v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd2, null)); + + GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("dummy output class"), + InputDescriptor.create("dummy input class")), InputDescriptor + .create("merge.class")); + + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addEdge(e1); + return dag.createDag(conf, null, null, null, true); + } + + private DAGPlan createDAGPlan_SingleVertexWith2Committer( + boolean commit1Succeed, boolean commit2Succeed) throws IOException { + return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false); + } + + // used for route event error in VM + private DAGPlan createDAGPlan_SingleVertexWith2Committer + (boolean commit1Succeed, boolean commit2Succeed, boolean customVM) throws IOException { + LOG.info("Setting up group dag plan"); + int dummyTaskCount = 1; + Resource dummyTaskResource = Resource.newInstance(1, 1); + org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create( + "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + if (customVM) { + v1.setVertexManagerPlugin( + VertexManagerPluginDescriptor.create( + FailOnVMEventReceivedlVertexManager.class.getName())); + } + OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !commit1Succeed, true).toUserPayload()))); + OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !commit2Succeed, true).toUserPayload()))); + + DAG dag = DAG.create("testDag"); + dag.addVertex(v1); + OutputDescriptor outDesc = OutputDescriptor.create("output.class"); + v1.addDataSink("v1Out_1", DataSinkDescriptor.create(outDesc, ocd1, null)); + v1.addDataSink("v1Out_2", DataSinkDescriptor.create(outDesc, ocd2, null)); + return dag.createDag(conf, null, null, null, true); + } + + private void initDAG(DAGImpl dag) { + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT)); + Assert.assertEquals(DAGState.INITED, dag.getState()); + } + + @SuppressWarnings("unchecked") + private void startDAG(DAGImpl impl) { + dispatcher.getEventHandler().handle( + new DAGEventStartDag(impl.getID(), null)); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, impl.getState()); + } + + @Test(timeout = 5000) + public void testVertexSucceedWithoutCommit() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertNull(v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + Assert.assertEquals(0, v1OutputCommitter_1.commitCounter); + Assert.assertEquals(0, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + Assert.assertEquals(0, v1OutputCommitter_2.commitCounter); + Assert.assertEquals(0, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexCommit_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + v1OutputCommitter_1.unblockCommit(); + waitForCommitCompleted(v1, "v1Out_1"); + // still in COMMITTING due to another pending commit + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + v1OutputCommitter_2.unblockCommit(); + waitUntil(v1, VertexState.SUCCEEDED); + Assert.assertNull(v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + Assert.assertEquals(1, v1OutputCommitter_1.commitCounter); + Assert.assertEquals(0, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + Assert.assertEquals(1, v1OutputCommitter_2.commitCounter); + Assert.assertEquals(0, v1OutputCommitter_2.abortCounter); + } + + // the first commit fail which cause the second commit abort + @Test(timeout = 5000) + public void testVertexCommitFail1_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(false, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + v1OutputCommitter_1.unblockCommit(); + waitUntil(v1, VertexState.FAILED); + + Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, + v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + Assert.assertEquals(1, v1OutputCommitter_1.commitCounter); + Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + // can't verify the commitCounter because v1OutputCommitter_2 may not be started + Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + // the first commit succeed while the second fails + @Test(timeout = 5000) + public void testVertexCommitFail2_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, false)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + v1OutputCommitter_1.unblockCommit(); + waitForCommitCompleted(v1, "v1Out_1"); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + v1OutputCommitter_2.unblockCommit(); + waitUntil(v1, VertexState.FAILED); + + Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, + v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + Assert.assertEquals(1, v1OutputCommitter_1.commitCounter); + Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + Assert.assertEquals(1, v1OutputCommitter_2.commitCounter); + Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexKilledWhileCommitting() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + // kill dag which will trigger the vertex killed event + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + dispatcher.await(); + Assert.assertEquals(VertexState.KILLED, v1.getState()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + Assert.assertEquals(VertexTerminationCause.DAG_KILL, + v1.getTerminationCause()); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert + .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexRescheduleWhileCommitting() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + // reschedule task + v1.handle(new VertexEventTaskReschedule(TezTaskID.getInstance( + v1.getVertexId(), 2))); + dispatcher.await(); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, + v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, + dag.getTerminationCause()); + + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexRouteEventErrorWhileCommitting() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + // reschedule task + VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex1", ByteBuffer.wrap(new byte[0])); + TezEvent tezEvent = new TezEvent(vmEvent, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", + null, null)); + v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent))); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, + v1.getTerminationCause()); + Assert.assertTrue(v1.commitFutures.isEmpty()); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, + dag.getTerminationCause()); + + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexInternalErrorWhileCommiting() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v1.getState()); + // internal error + v1.handle(new VertexEvent(v1.getVertexId(), + VertexEventType.V_INTERNAL_ERROR)); + dispatcher.await(); + Assert.assertEquals(VertexState.ERROR, v1.getState()); + Assert.assertEquals(VertexTerminationCause.INTERNAL_ERROR, + v1.getTerminationCause()); + Assert.assertEquals(DAGState.ERROR, dag.getState()); + Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, + dag.getTerminationCause()); + + CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_1"); + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); + Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); + // commit may not have started, so can't verify commitCounter + // TODO abort it when internal error happens TEZ-2250 + // Assert.assertEquals(1, v1OutputCommitter_1.abortCounter); + + Assert.assertEquals(1, v1OutputCommitter_2.initCounter); + Assert.assertEquals(1, v1OutputCommitter_2.setupCounter); + // commit may not have started, so can't verify commitCounter + // TODO abort it when internal error happens TEZ-2250 + // Assert.assertEquals(1, v1OutputCommitter_2.abortCounter); + } + + @Test(timeout = 5000) + public void testDAGCommitSucceeded_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + // still in COMMITTING due to another pending commit + waitUntil(dag, DAGState.COMMITTING); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.SUCCEEDED); + Assert.assertTrue(dag.commitFutures.isEmpty()); + Assert.assertNull(dag.getTerminationCause()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(0, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(0, v3OutputCommitter.abortCounter); + } + + // first commit(v12Out) succeed and then the second commit(v3Out) fail + @Test(timeout = 5000) + public void testDAGCommitFail1_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, false)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + waitForCommitCompleted(dag, new OutputKey("v12Out", "uv12", true)); + // still in COMMITTING due to another pending commit + Assert.assertEquals(DAGState.COMMITTING, dag.getState()); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); + + Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // the first commit(v12Out) fail + @Test(timeout = 5000) + public void testDAGCommitFail2_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(false, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v12OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); + + Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // commit of v3Out complete first then commit of v12Out complete + @Test(timeout = 5000) + public void testDAGCommitSucceeded1_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(v3, VertexState.SUCCEEDED); + // dag go to COMMITTING due to the pending vertex group commit of v1,v2 + waitUntil(dag, DAGState.COMMITTING); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.SUCCEEDED); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(0, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(0, v3OutputCommitter.abortCounter); + } + + // commit of v12Out complete first then commit of v3Out + @Test(timeout = 5000) + public void testDAGCommitSucceeded2_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + // ugly (wait for commit event sent out) + Thread.sleep(500); + dispatcher.await(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.SUCCEEDED); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(0, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(0, v3OutputCommitter.abortCounter); + } + + // commit of vertex group(v1,v2) fail and commit of v3 is not completed + @Test(timeout = 5000) + public void testDAGCommitFail1_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(false, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + // v3 is killed due to the commit failure of the vertex group (v1,v2) + Assert.assertEquals(VertexState.KILLED, v3.getState()); + Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, + v3.getTerminationCause()); + Assert.assertTrue(v3.commitFutures.isEmpty()); + Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // commit of vertex v3 fail and commit of vertex group (v1,v2) is not completed + @Test(timeout = 5000) + public void testDAGCommitFail2_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, false)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(VertexState.FAILED, v3.getState()); + Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, + v3.getTerminationCause()); + Assert.assertTrue(v3.commitFutures.isEmpty()); + Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // vertex group (v1,v2) succeeded first and then commit of vertex v3 fail + @Test (timeout = 5000) + public void testDAGCommitFail3_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, false)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + + waitForCommitCompleted(dag, new OutputKey("v12Out", "uv12", true)); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(VertexState.FAILED, v3.getState()); + Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, + v3.getTerminationCause()); + Assert.assertTrue(v3.commitFutures.isEmpty()); + Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // commit of vertex v3 succeeded first and then commit of vertex group(v1,v2) fail + @Test(timeout = 5000) + public void testDAGCommitFail4_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(false, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitForCommitCompleted(dag, new OutputKey("v3Out", "vertex3", true)); + waitUntil(v3, VertexState.SUCCEEDED); + Assert.assertTrue(v3.commitFutures.isEmpty()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + v12OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + @Test (timeout = 5000) + public void testDAGInternalErrorWhileCommiting() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + dag.handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR)); + waitUntil(dag, DAGState.ERROR); + + Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause()); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + // TODO abort it when internal error happens TEZ-2250 + // Assert.assertEquals(0, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + // TODO abort it when internal error happens TEZ-2250 + // Assert.assertEquals(0, v3OutputCommitter.abortCounter); + } + + // Kill dag while it is in COMMITTING in the case of + // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true + @Test(timeout = 5000) + public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + waitUntil(dag, DAGState.KILLED); + + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); + Assert + .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // Kill dag while it is in COMMITTING in the case of + // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false + @Test(timeout = 5000) + public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + // dag is still in RUNNING because v3 has not completed + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + // dag go to COMMITTING due to the pending commit of v12Out + waitUntil(dag, DAGState.COMMITTING); + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + waitUntil(dag, DAGState.KILLED); + + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert + .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // DAG killed while dag is still in RUNNING and vertex is in COMMITTING + @Test(timeout = 5000) + public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + // dag is still in RUNNING because v3 has not completed + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + waitUntil(dag, DAGState.KILLED); + + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.KILLED, v3.getState()); + Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause()); + Assert.assertTrue(v3.commitFutures.isEmpty()); + Assert.assertEquals(DAGState.KILLED, dag.getState()); + Assert + .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + @Test(timeout = 5000) + public void testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + dag.handle(new DAGEventVertexReRunning(v1.getVertexId())); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + @Test(timeout = 5000) + public void testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + dag.handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR)); + waitUntil(dag, DAGState.ERROR); + + Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause()); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + @Test(timeout = 5000) + public void testVertexGroupCommitFinishedEventFail() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlan(true, true)); + MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext); + doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler(); + mockHistoryEventHandler.failVertexGroupCommitFinishedEvent = true; + + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v12OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.FAILED); + + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, + dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + Assert.assertEquals(VertexState.KILLED, v3.getState()); + Assert.assertEquals(VertexTerminationCause.OTHER_VERTEX_FAILURE, v3.getTerminationCause()); + Assert.assertTrue(v3.commitFutures.isEmpty()); + + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + Assert.assertEquals(1, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit may not have started, so can't verify commitCounter + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + @Test(timeout = 5000) + public void testDAGCommitStartedEventFail() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext); + doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler(); + mockHistoryEventHandler.failDAGCommitStartedEvent = true; + + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.FAILED); + Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, dag.getTerminationCause()); + Assert.assertTrue(dag.commitFutures.isEmpty()); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit has not started + Assert.assertEquals(0, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit has not started + Assert.assertEquals(0, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + } + + // test commit will be canceled no matter it is started or still in the threadpool + // ControlledThreadPoolExecutor is used for to not schedule the commits + @Test(timeout = 5000) + public void testCommitCanceled() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + true); + setupDAG(createDAGPlan(true, true)); + // create customized ThreadPoolExecutor to wait before schedule new task + rawExecutor = new ControlledThreadPoolExecutor(1); + execService = MoreExecutors.listeningDecorator(rawExecutor); + doReturn(execService).when(appContext).getExecService(); + + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + // need to make vertices to go to SUCCEEDED + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + waitUntil(dag, DAGState.COMMITTING); + // mean the commits have been submitted to ThreadPool + Assert.assertEquals(2, dag.commitFutures.size()); + + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); + waitUntil(dag, DAGState.KILLED); + + Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + // mean the commits have been canceled + Assert.assertTrue(dag.commitFutures.isEmpty()); + + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out"); + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + Assert.assertEquals(1, v12OutputCommitter.initCounter); + Assert.assertEquals(1, v12OutputCommitter.setupCounter); + // commit is not started because ControlledThreadPoolExecutor wait before schedule tasks + Assert.assertEquals(0, v12OutputCommitter.commitCounter); + Assert.assertEquals(1, v12OutputCommitter.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + // commit is not started because ControlledThreadPoolExecutor wait before schedule tasks + Assert.assertEquals(0, v3OutputCommitter.commitCounter); + Assert.assertEquals(1, v3OutputCommitter.abortCounter); + + } + + public static class FailOnVMEventReceivedlVertexManager extends ImmediateStartVertexManager { + + public FailOnVMEventReceivedlVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + super.onVertexManagerEventReceived(vmEvent); + throw new RuntimeException("fail vm"); + } + + } + + private static class MockHistoryEventHandler extends HistoryEventHandler { + + public boolean failVertexGroupCommitFinishedEvent = false; + public boolean failDAGCommitStartedEvent = false; + + public MockHistoryEventHandler(AppContext context) { + super(context); + } + + @Override + public void handleCriticalEvent(DAGHistoryEvent event) throws IOException { + if (event.getHistoryEvent().getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED + && failVertexGroupCommitFinishedEvent) { + throw new IOException("fail VertexGroupCommitFinishedEvent"); + } + if (event.getHistoryEvent().getEventType() == HistoryEventType.DAG_COMMIT_STARTED + && failDAGCommitStartedEvent) { + throw new IOException("fail DAGCommitStartedEvent"); + } + } + } + + private static class ControlledThreadPoolExecutor extends ThreadPoolExecutor { + + public ControlledThreadPoolExecutor(int poolSize) { + this(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + public ControlledThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public boolean startFlag = false; + + @Override + protected void beforeExecute(Thread t, Runnable r) { + while(!startFlag) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.beforeExecute(t, r); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 98c8492..7e944ef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -1364,7 +1364,7 @@ public class TestDAGImpl { } } - // error on vertex -> dag error -> successful vertex output not aborted + // error on vertex -> dag error Vertex errorVertex = mrrDag.getVertex("vertex3"); dispatcher.getEventHandler().handle(new VertexEvent( errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR)); @@ -1383,7 +1383,8 @@ public class TestDAGImpl { Assert.assertEquals(1, committer.initCounter); Assert.assertEquals(1, committer.setupCounter); } else { - Assert.assertEquals(0, committer.abortCounter); + // abort operation should take no side effort on the successful commit + Assert.assertEquals(1, committer.abortCounter); Assert.assertEquals(1, committer.commitCounter); Assert.assertEquals(1, committer.initCounter); Assert.assertEquals(1, committer.setupCounter); http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 5ad320e..891da23 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -3341,8 +3341,7 @@ public class TestVertexImpl { Assert.assertEquals(1, committer.commitCounter); - // FIXME need to verify whether abort needs to be called if commit fails - Assert.assertEquals(0, committer.abortCounter); + Assert.assertEquals(1, committer.abortCounter); Assert.assertEquals(1, committer.initCounter); Assert.assertEquals(1, committer.setupCounter); } @@ -3386,8 +3385,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause()); Assert.assertEquals(1, committer.commitCounter); - // FIXME need to verify whether abort needs to be called if commit fails - Assert.assertEquals(0, committer.abortCounter); + Assert.assertEquals(1, committer.abortCounter); Assert.assertEquals(1, committer.initCounter); Assert.assertEquals(1, committer.setupCounter); } http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java index 185f4a4..d1bc46c 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java @@ -82,6 +82,8 @@ public class ExampleDriver { "Filters lines by the specified word using broadcast edge"); pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class, "Filters lines by the specified word using OneToOne edge"); + pgd.addClass("multiplecommitsExample", MultipleCommitsExample.class, + "Job with multiple commits in both vertex group and vertex"); exitCode = pgd.run(argv); } catch(Throwable e){