Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0851117DF5 for ; Fri, 8 May 2015 04:43:23 +0000 (UTC) Received: (qmail 38788 invoked by uid 500); 8 May 2015 04:43:23 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 38746 invoked by uid 500); 8 May 2015 04:43:22 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 38737 invoked by uid 99); 8 May 2015 04:43:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 04:43:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D7EDE4422; Fri, 8 May 2015 04:43:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjffdu@apache.org To: commits@tez.apache.org Message-Id: <78c933b730514b9db2cfb0350d0ad02b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly (zjffdu) Date: Fri, 8 May 2015 04:43:22 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 05f77fe2b -> 4a6808ce4 TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a6808ce Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a6808ce Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a6808ce Branch: refs/heads/master Commit: 4a6808ce4c99458653bbe4328dfcad24649a48fb Parents: 05f77fe Author: Jeff Zhang Authored: Fri May 8 12:42:46 2015 +0800 Committer: Jeff Zhang Committed: Fri May 8 12:42:46 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 61 ++- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 - .../apache/tez/dag/app/dag/impl/TestCommit.java | 454 ++++++++++++++++++- 4 files changed, 477 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba8e9d8..3520768 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES Default max limit increased. Should not affect existing users. ALL CHANGES: + TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly TEZ-776. Reduce AM mem usage caused by storing TezEvents TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page TEZ-2416. Tez UI: Make tooltips display faster. http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index f769565..1726c18 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -212,7 +212,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, @VisibleForTesting Map> commitFutures = new HashMap>(); - private Set succeededCommits = new HashSet(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); @@ -457,7 +456,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Set outputs; Map edgeMergedInputs; int successfulMembers; - boolean committed; + int successfulCommits; + boolean commitStarted; + VertexGroupInfo(PlanVertexGroupInfo groupInfo) { groupName = groupInfo.getGroupName(); groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList()); @@ -468,10 +469,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } outputs = Sets.newHashSet(groupInfo.getOutputsList()); successfulMembers = 0; - committed = false; + successfulCommits = 0; + commitStarted = false; + } + + public boolean isInCommitting() { + return commitStarted && successfulCommits < outputs.size(); + } + + public boolean isCommitted() { + return commitStarted && successfulCommits == outputs.size(); } } + public DAGImpl(TezDAGID dagId, Configuration amConf, DAGPlan jobPlan, @@ -962,7 +973,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // commit all shared outputs for (final VertexGroupInfo groupInfo : vertexGroups.values()) { if (!groupInfo.outputs.isEmpty()) { - groupInfo.committed = true; + groupInfo.commitStarted = true; final Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); for (final String outputName : groupInfo.outputs) { final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true); @@ -1920,7 +1931,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, + " data, groupName=" + groupInfo.groupName); continue; } - groupInfo.committed = true; + groupInfo.commitStarted = true; final Vertex v = getVertex(groupInfo.groupMembers.iterator().next()); try { appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), @@ -1966,11 +1977,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, + ", vertexId=" + vertex.getVertexId()); if (!commitAllOutputsOnSuccess) { - // partial output may already have been committed. fail if so + // partial output may already have been in committing or committed. fail if so List groupList = vertexGroupInfo.get(vertex.getName()); if (groupList != null) { for (VertexGroupInfo groupInfo : groupList) { - if (groupInfo.committed) { + if (groupInfo.isInCommitting()) { + String msg = "Aborting job as committing vertex: " + + vertex.getLogIdentifier() + " is re-running"; + LOG.info(msg); + addDiagnostic(msg); + enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, + VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING); + return true; + } else if (groupInfo.isCommitted()) { String msg = "Aborting job as committed vertex: " + vertex.getLogIdentifier() + " is re-running"; LOG.info(msg); @@ -2091,17 +2110,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, boolean recoveryFailed = false; if (commitCompletedEvent.isSucceeded()) { LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputKey()); - succeededCommits.add(commitCompletedEvent.getOutputKey()); - if (!commitAllOutputsOnSuccess) { - try { - appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), - new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(), - clock.getTime()))); - } catch (IOException e) { - String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e); - addDiagnostic(diag); - LOG.error(diag); - recoveryFailed = true; + OutputKey outputKey = commitCompletedEvent.getOutputKey(); + if (outputKey.isVertexGroupOutput){ + VertexGroupInfo vertexGroup = vertexGroups.get(outputKey.getEntityName()); + vertexGroup.successfulCommits++; + if (vertexGroup.isCommitted()) { + if (!commitAllOutputsOnSuccess) { + try { + appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(), + new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(), + clock.getTime()))); + } catch (IOException e) { + String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e); + addDiagnostic(diag); + LOG.error(diag); + recoveryFailed = true; + } + } } } } else { http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index a16ee0a..3a9558d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1901,7 +1901,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR); return vertex.finished(VertexState.FAILED); } - } else { firstCommit = false; } VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName); http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 0df8a4f..8fc29c2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -26,9 +26,9 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -63,7 +63,6 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.VertexStatus; @@ -96,11 +95,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey; import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.*; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.runtime.api.Event; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.events.VertexManagerEvent; @@ -144,7 +145,7 @@ public class TestCommit { private TaskHeartbeatHandler thh; private Clock clock = new SystemClock(); private DAGFinishEventHandler dagFinishEventHandler; - private HistoryEventHandler historyEventHandler; + private MockHistoryEventHandler historyEventHandler; private TaskAttemptEventDispatcher taskAttemptEventDispatcher; private ExecutorService rawExecutor; @@ -305,7 +306,7 @@ public class TestCommit { execService = MoreExecutors.listeningDecorator(rawExecutor); doReturn(execService).when(appContext).getExecService(); - historyEventHandler = mock(HistoryEventHandler.class); + historyEventHandler = new MockHistoryEventHandler(appContext); aclManager = new ACLManager("amUser"); doReturn(conf).when(appContext).getAMConf(); doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); @@ -441,6 +442,63 @@ public class TestCommit { return dag.createDag(conf, null, null, null, true); } + // v1->v3 + // v2->v3 + // vertex_group (v1, v2) has 2 shared outputs + private DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean vertexGroupCommitSucceeded1, + boolean vertexGroupCommitSucceeded2, boolean v3CommitSucceeded) throws Exception { + LOG.info("Setting up group dag plan"); + int dummyTaskCount = 1; + Resource dummyTaskResource = Resource.newInstance(1, 1); + org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create( + "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create( + "vertex2", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create( + "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount, + dummyTaskResource); + + DAG dag = DAG.create("testDag"); + String groupName1 = "uv12"; + OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !vertexGroupCommitSucceeded1, true).toUserPayload()))); + OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !vertexGroupCommitSucceeded2, true).toUserPayload()))); + OutputCommitterDescriptor ocd3 = OutputCommitterDescriptor.create( + CountingOutputCommitter.class.getName()).setUserPayload( + UserPayload.create(ByteBuffer + .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( + !v3CommitSucceeded, true).toUserPayload()))); + + org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, + v1, v2); + OutputDescriptor outDesc = OutputDescriptor.create("output.class"); + uv12.addDataSink("v12Out1", DataSinkDescriptor.create(outDesc, ocd1, null)); + uv12.addDataSink("v12Out2", DataSinkDescriptor.create(outDesc, ocd2, null)); + v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd3, null)); + + GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("dummy output class"), + InputDescriptor.create("dummy input class")), InputDescriptor + .create("merge.class")); + + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addEdge(e1); + return dag.createDag(conf, null, null, null, true); + } + private DAGPlan createDAGPlan_SingleVertexWith2Committer( boolean commit1Succeed, boolean commit2Succeed) throws IOException { return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false); @@ -493,7 +551,7 @@ public class TestCommit { } @Test(timeout = 5000) - public void testVertexSucceedWithoutCommit() throws Exception { + public void testVertexCommit_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); @@ -509,6 +567,9 @@ public class TestCommit { .getOutputCommitter("v1Out_1"); CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_2"); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); Assert.assertEquals(0, v1OutputCommitter_1.commitCounter); @@ -544,6 +605,8 @@ public class TestCommit { waitUntil(v1, VertexState.SUCCEEDED); Assert.assertNull(v1.getTerminationCause()); Assert.assertTrue(v1.commitFutures.isEmpty()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); Assert.assertEquals(1, v1OutputCommitter_1.initCounter); Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); @@ -576,8 +639,11 @@ public class TestCommit { Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v1.getTerminationCause()); Assert.assertTrue(v1.commitFutures.isEmpty()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_2"); + Assert.assertEquals(1, v1OutputCommitter_1.initCounter); Assert.assertEquals(1, v1OutputCommitter_1.setupCounter); Assert.assertEquals(1, v1OutputCommitter_1.commitCounter); @@ -606,11 +672,13 @@ public class TestCommit { v1OutputCommitter_1.unblockCommit(); waitForCommitCompleted(v1, "v1Out_1"); Assert.assertEquals(VertexState.COMMITTING, v1.getState()); - + CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_2"); v1OutputCommitter_2.unblockCommit(); waitUntil(v1, VertexState.FAILED); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v1.getTerminationCause()); @@ -647,6 +715,8 @@ public class TestCommit { Assert.assertEquals(DAGState.KILLED, dag.getState()); Assert .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_1"); @@ -685,6 +755,8 @@ public class TestCommit { Assert.assertEquals(DAGState.FAILED, dag.getState()); Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, dag.getTerminationCause()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_1"); @@ -727,6 +799,8 @@ public class TestCommit { Assert.assertEquals(DAGState.FAILED, dag.getState()); Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, dag.getTerminationCause()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_1"); @@ -764,6 +838,8 @@ public class TestCommit { Assert.assertEquals(DAGState.ERROR, dag.getState()); Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause()); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1 .getOutputCommitter("v1Out_1"); @@ -811,6 +887,20 @@ public class TestCommit { waitUntil(dag, DAGState.SUCCEEDED); Assert.assertTrue(dag.commitFutures.isEmpty()); Assert.assertNull(dag.getTerminationCause()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v3", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v3", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -857,10 +947,21 @@ public class TestCommit { Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v3.getState()); - Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -899,6 +1000,8 @@ public class TestCommit { .getOutputCommitter("v3Out"); v12OutputCommitter.unblockCommit(); waitUntil(dag, DAGState.FAILED); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); @@ -907,6 +1010,18 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -953,6 +1068,18 @@ public class TestCommit { v12OutputCommitter.unblockCommit(); waitUntil(dag, DAGState.SUCCEEDED); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -1001,6 +1128,18 @@ public class TestCommit { v3OutputCommitter.unblockCommit(); waitUntil(dag, DAGState.SUCCEEDED); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -1013,6 +1152,71 @@ public class TestCommit { Assert.assertEquals(0, v3OutputCommitter.abortCounter); } + // test DAGCommitSucceeded when vertex group has multiple shared outputs + @Test(timeout = 5000) + public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception { + conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, + false); + setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true)); + initDAG(dag); + startDAG(dag); + VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); + VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2"); + VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3"); + + v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(), + TaskState.SUCCEEDED)); + Assert.assertEquals(VertexState.SUCCEEDED, v1.getState()); + Assert.assertEquals(VertexState.SUCCEEDED, v2.getState()); + Assert.assertEquals(VertexState.COMMITTING, v3.getState()); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v12OutputCommitter1 = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out1"); + v12OutputCommitter1.unblockCommit(); + CountingOutputCommitter v12OutputCommitter2 = (CountingOutputCommitter) v1 + .getOutputCommitter("v12Out2"); + v12OutputCommitter2.unblockCommit(); + Assert.assertEquals(DAGState.RUNNING, dag.getState()); + + CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 + .getOutputCommitter("v3Out"); + v3OutputCommitter.unblockCommit(); + waitUntil(dag, DAGState.SUCCEEDED); + Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); + + Assert.assertEquals(1, v12OutputCommitter1.initCounter); + Assert.assertEquals(1, v12OutputCommitter1.setupCounter); + Assert.assertEquals(1, v12OutputCommitter1.commitCounter); + Assert.assertEquals(0, v12OutputCommitter1.abortCounter); + + Assert.assertEquals(1, v12OutputCommitter2.initCounter); + Assert.assertEquals(1, v12OutputCommitter2.setupCounter); + Assert.assertEquals(1, v12OutputCommitter2.commitCounter); + Assert.assertEquals(0, v12OutputCommitter2.abortCounter); + + Assert.assertEquals(1, v3OutputCommitter.initCounter); + Assert.assertEquals(1, v3OutputCommitter.setupCounter); + Assert.assertEquals(1, v3OutputCommitter.commitCounter); + Assert.assertEquals(0, v3OutputCommitter.abortCounter); + } + // commit of vertex group(v1,v2) fail and commit of v3 is not completed @Test(timeout = 5000) public void testDAGCommitFail1_OnVertexSuccess() throws Exception { @@ -1048,6 +1252,16 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -1097,6 +1311,16 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); @@ -1152,6 +1376,16 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -1202,6 +1436,16 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(1, v12OutputCommitter.initCounter); Assert.assertEquals(1, v12OutputCommitter.setupCounter); @@ -1215,7 +1459,7 @@ public class TestCommit { } @Test (timeout = 5000) - public void testDAGInternalErrorWhileCommiting() throws Exception { + public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan(true, true)); @@ -1236,6 +1480,17 @@ public class TestCommit { waitUntil(dag, DAGState.ERROR); Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 @@ -1282,6 +1537,16 @@ public class TestCommit { Assert .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); @@ -1335,6 +1600,16 @@ public class TestCommit { Assert .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); @@ -1383,7 +1658,16 @@ public class TestCommit { Assert .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); - + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); @@ -1424,6 +1708,17 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 @@ -1461,6 +1756,17 @@ public class TestCommit { waitUntil(dag, DAGState.ERROR); Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 @@ -1476,14 +1782,12 @@ public class TestCommit { Assert.assertEquals(1, v3OutputCommitter.abortCounter); } - @Test(timeout = 5000) - public void testVertexGroupCommitFinishedEventFail() throws Exception { + @Test (timeout = 5000) + public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); setupDAG(createDAGPlan(true, true)); - MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext); - doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler(); - mockHistoryEventHandler.failVertexGroupCommitFinishedEvent = true; + historyEventHandler.failVertexGroupCommitFinishedEvent = true; initDAG(dag); startDAG(dag); @@ -1503,6 +1807,16 @@ public class TestCommit { .getOutputCommitter("v3Out"); v12OutputCommitter.unblockCommit(); waitUntil(dag, DAGState.FAILED); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); Assert.assertEquals(DAGState.FAILED, dag.getState()); Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, @@ -1524,13 +1838,11 @@ public class TestCommit { } @Test(timeout = 5000) - public void testDAGCommitStartedEventFail() throws Exception { + public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan(true, true)); - MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext); - doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler(); - mockHistoryEventHandler.failDAGCommitStartedEvent = true; + historyEventHandler.failDAGCommitStartedEvent = true; initDAG(dag); startDAG(dag); @@ -1547,6 +1859,17 @@ public class TestCommit { waitUntil(dag, DAGState.FAILED); Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, dag.getTerminationCause()); Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); + CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3 @@ -1567,7 +1890,7 @@ public class TestCommit { // test commit will be canceled no matter it is started or still in the threadpool // ControlledThreadPoolExecutor is used for to not schedule the commits @Test(timeout = 5000) - public void testCommitCanceled() throws Exception { + public void testCommitCanceled_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); setupDAG(createDAGPlan(true, true)); @@ -1598,6 +1921,16 @@ public class TestCommit { Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); // mean the commits have been canceled Assert.assertTrue(dag.commitFutures.isEmpty()); + historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0); + historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0); + historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1); + historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0); + historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1); + historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1); + historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1); CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1 .getOutputCommitter("v12Out"); @@ -1635,7 +1968,7 @@ public class TestCommit { public boolean failVertexGroupCommitFinishedEvent = false; public boolean failDAGCommitStartedEvent = false; - + public List historyEvents = new ArrayList(); public MockHistoryEventHandler(AppContext context) { super(context); } @@ -1650,6 +1983,85 @@ public class TestCommit { && failDAGCommitStartedEvent) { throw new IOException("fail DAGCommitStartedEvent"); } + historyEvents.add(event.getHistoryEvent()); + } + + public void verifyVertexGroupCommitStartedEvent(String groupName, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_STARTED) { + VertexGroupCommitStartedEvent startedEvent = (VertexGroupCommitStartedEvent)event; + if (startedEvent.getVertexGroupName().equals(groupName)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); + } + + public void verifyVertexGroupCommitFinishedEvent(String groupName, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED) { + VertexGroupCommitFinishedEvent finishedEvent = (VertexGroupCommitFinishedEvent)event; + if (finishedEvent.getVertexGroupName().equals(groupName)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); + } + + public void verifyVertexCommitStartedEvent(TezVertexID vertexId, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.VERTEX_COMMIT_STARTED) { + VertexCommitStartedEvent startedEvent = (VertexCommitStartedEvent)event; + if (startedEvent.getVertexID().equals(vertexId)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); + } + + public void verifyVertexFinishedEvent(TezVertexID vertexId, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.VERTEX_FINISHED) { + VertexFinishedEvent finishedEvent = (VertexFinishedEvent)event; + if (finishedEvent.getVertexID().equals(vertexId)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); + } + + public void verifyDAGCommitStartedEvent(TezDAGID dagId, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.DAG_COMMIT_STARTED) { + DAGCommitStartedEvent startedEvent = (DAGCommitStartedEvent)event; + if (startedEvent.getDagID().equals(dagId)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); + } + + public void verifyDAGFinishedEvent(TezDAGID dagId, int expectedTimes) { + int actualTimes = 0; + for (HistoryEvent event : historyEvents) { + if (event.getEventType() == HistoryEventType.DAG_FINISHED) { + DAGFinishedEvent startedEvent = (DAGFinishedEvent)event; + if (startedEvent.getDagID().equals(dagId)) { + actualTimes ++; + } + } + } + Assert.assertEquals(expectedTimes, actualTimes); } }