Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B90D5175EA for ; Wed, 29 Apr 2015 21:53:39 +0000 (UTC) Received: (qmail 33915 invoked by uid 500); 29 Apr 2015 21:53:39 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 33879 invoked by uid 500); 29 Apr 2015 21:53:39 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 33870 invoked by uid 99); 29 Apr 2015 21:53:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 21:53:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A9F1DFF8C; Wed, 29 Apr 2015 21:53:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Message-Id: <9f5e4d97890b4347815b841a7e43cc3d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. (Jeff Zhang via hitesh) Date: Wed, 29 Apr 2015 21:53:39 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 0c690e392 -> c5f57ccf6 TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. (Jeff Zhang via hitesh) (cherry picked from commit 6b6834e823e1649dc5539adbe3a40b87adfc4648) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c5f57ccf Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c5f57ccf Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c5f57ccf Branch: refs/heads/branch-0.6 Commit: c5f57ccf67dc7fa499e5095a0bed1f4759d3aa33 Parents: 0c690e3 Author: Hitesh Shah Authored: Wed Apr 29 14:47:48 2015 -0700 Committer: Hitesh Shah Committed: Wed Apr 29 14:48:21 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +- .../dag/app/dag/impl/TestVertexRecovery.java | 124 ++++++++++++++++++- 3 files changed, 124 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 842f5fa..9067c86 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -183,6 +183,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path TEZ-2303. ConcurrentModificationException while processing recovery. TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy. http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b94c781..b01d05c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -294,7 +294,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, new StartRecoverTransition()) .addTransition (VertexState.NEW, - EnumSet.of(VertexState.INITED, + EnumSet.of(VertexState.NEW, VertexState.INITED, VertexState.INITIALIZING, VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.ERROR, http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index 664382a..6c5e111 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -262,6 +262,94 @@ public class TestVertexRecovery { return dag; } + /* + * v1 + * | + * v2 + */ + private DAGPlan createDAGPlanMR() { + DAGPlan dag = + DAGPlan + .newBuilder() + .setName("testverteximpl") + .addVertex( + VertexPlan + .newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder().addHost("host1") + .addRack("rack1").build()) + .setTaskConfig( + PlanTaskConfiguration.newBuilder().setNumTasks(1) + .setVirtualCores(4).setMemoryMb(1024) + .setJavaOpts("").setTaskModule("x1.y1").build()) + .addOutEdgeId("e1") + .addOutputs( + DAGProtos.RootInputLeafOutputProto + .newBuilder() + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("output").build()) + .setName("outputx") + .setControllerDescriptor( + TezEntityDescriptorProto + .newBuilder() + .setClassName( + CountingOutputCommitter.class.getName()))) + .build()) + .addVertex( + VertexPlan + .newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .setProcessorDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + "x2.y2")) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder().addHost("host2") + .addRack("rack2").build()) + .setTaskConfig( + PlanTaskConfiguration.newBuilder().setNumTasks(2) + .setVirtualCores(4).setMemoryMb(1024) + .setJavaOpts("foo").setTaskModule("x2.y2").build()) + .addInEdgeId("e1") + .addOutputs( + DAGProtos.RootInputLeafOutputProto + .newBuilder() + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("output").build()) + .setName("outputx") + .setControllerDescriptor( + TezEntityDescriptorProto + .newBuilder() + .setClassName( + CountingOutputCommitter.class.getName()))) + .build() + + ) + .addEdge( + EdgePlan + .newBuilder() + .setEdgeDestination( + TezEntityDescriptorProto.newBuilder().setClassName( + "i2_v1")) + .setInputVertexName("vertex1") + .setEdgeSource( + TezEntityDescriptorProto.newBuilder() + .setClassName("o1")) + .setOutputVertexName("vertex2") + .setDataMovementType( + PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build()) + .build(); + + return dag; + } + class VertexEventHanlder implements EventHandler { private List events = new ArrayList(); @@ -692,7 +780,8 @@ public class TestVertexRecovery { /** * vertex1 (New) -> StartRecoveryTransition
- * vertex2 (New) -> StartRecoveryTransition vertex3 (New) -> RecoverTransition + * vertex2 (New) -> StartRecoveryTransition
+ * vertex3 (New) -> RecoverTransition */ @Test(timeout = 5000) public void testRecovery_RecoveringFromNew() { @@ -741,7 +830,38 @@ public class TestVertexRecovery { } - + /** + * vertex1 (New) -> StartRecoveryTransition
+ * vertex2 (New) -> RecoveryTransition
+ */ + @Test + public void testMRDAG() { + DAGPlan dagPlan = createDAGPlanMR(); + dag = + new DAGImpl(dagId, new Configuration(), dagPlan, + dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + new Credentials(), new SystemClock(), user, + mock(TaskHeartbeatHandler.class), mockAppContext); + when(mockAppContext.getCurrentDAG()).thenReturn(dag); + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); + + VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1"); + VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2"); + assertEquals(VertexState.NEW, vertex1.getState()); + assertEquals(VertexState.NEW, vertex1.getState()); + + // vertex1 handle RecoveryEvent at the state of NEW + // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW + vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), + VertexState.RUNNING)); + dispatcher.await(); + assertEquals(VertexState.RUNNING, vertex1.getState()); + assertEquals(1, vertex1.getTasks().size()); + // verify OutputCommitter is initialized + assertOutputCommitters(vertex1); + assertEquals(VertexState.RUNNING, vertex2.getState()); + } + @Test(timeout = 5000) public void testRecovery_VertexManagerErrorOnRecovery() { VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");