tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. (Jeff Zhang via hitesh)
Date Wed, 29 Apr 2015 21:53:39 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 0c690e392 -> c5f57ccf6


TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. (Jeff
Zhang via hitesh)

(cherry picked from commit 6b6834e823e1649dc5539adbe3a40b87adfc4648)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c5f57ccf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c5f57ccf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c5f57ccf

Branch: refs/heads/branch-0.6
Commit: c5f57ccf67dc7fa499e5095a0bed1f4759d3aa33
Parents: 0c690e3
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Apr 29 14:47:48 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Apr 29 14:48:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../dag/app/dag/impl/TestVertexRecovery.java    | 124 ++++++++++++++++++-
 3 files changed, 124 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 842f5fa..9067c86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -183,6 +183,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery.
   TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path
   TEZ-2303. ConcurrentModificationException while processing recovery.
   TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy.

http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b94c781..b01d05c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -294,7 +294,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   new StartRecoverTransition())
           .addTransition
               (VertexState.NEW,
-                  EnumSet.of(VertexState.INITED,
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
                       VertexState.INITIALIZING, VertexState.RUNNING,
                       VertexState.SUCCEEDED, VertexState.FAILED,
                       VertexState.KILLED, VertexState.ERROR,

http://git-wip-us.apache.org/repos/asf/tez/blob/c5f57ccf/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 664382a..6c5e111 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -262,6 +262,94 @@ public class TestVertexRecovery {
     return dag;
   }
 
+  /*
+   * v1
+   *  |
+   * v2
+   */
+  private DAGPlan createDAGPlanMR() {
+    DAGPlan dag =
+        DAGPlan
+            .newBuilder()
+            .setName("testverteximpl")
+            .addVertex(
+                VertexPlan
+                    .newBuilder()
+                    .setName("vertex1")
+                    .setType(PlanVertexType.NORMAL)
+                    .addTaskLocationHint(
+                        PlanTaskLocationHint.newBuilder().addHost("host1")
+                            .addRack("rack1").build())
+                    .setTaskConfig(
+                        PlanTaskConfiguration.newBuilder().setNumTasks(1)
+                            .setVirtualCores(4).setMemoryMb(1024)
+                            .setJavaOpts("").setTaskModule("x1.y1").build())
+                    .addOutEdgeId("e1")
+                    .addOutputs(
+                        DAGProtos.RootInputLeafOutputProto
+                            .newBuilder()
+                            .setIODescriptor(
+                                TezEntityDescriptorProto.newBuilder()
+                                    .setClassName("output").build())
+                            .setName("outputx")
+                            .setControllerDescriptor(
+                                TezEntityDescriptorProto
+                                    .newBuilder()
+                                    .setClassName(
+                                        CountingOutputCommitter.class.getName())))
+                    .build())
+            .addVertex(
+                VertexPlan
+                    .newBuilder()
+                    .setName("vertex2")
+                    .setType(PlanVertexType.NORMAL)
+                    .setProcessorDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            "x2.y2"))
+                    .addTaskLocationHint(
+                        PlanTaskLocationHint.newBuilder().addHost("host2")
+                            .addRack("rack2").build())
+                    .setTaskConfig(
+                        PlanTaskConfiguration.newBuilder().setNumTasks(2)
+                            .setVirtualCores(4).setMemoryMb(1024)
+                            .setJavaOpts("foo").setTaskModule("x2.y2").build())
+                    .addInEdgeId("e1")
+                    .addOutputs(
+                        DAGProtos.RootInputLeafOutputProto
+                            .newBuilder()
+                            .setIODescriptor(
+                                TezEntityDescriptorProto.newBuilder()
+                                    .setClassName("output").build())
+                            .setName("outputx")
+                            .setControllerDescriptor(
+                                TezEntityDescriptorProto
+                                    .newBuilder()
+                                    .setClassName(
+                                        CountingOutputCommitter.class.getName())))
+                    .build()
+
+            )
+            .addEdge(
+                EdgePlan
+                    .newBuilder()
+                    .setEdgeDestination(
+                        TezEntityDescriptorProto.newBuilder().setClassName(
+                            "i2_v1"))
+                    .setInputVertexName("vertex1")
+                    .setEdgeSource(
+                        TezEntityDescriptorProto.newBuilder()
+                            .setClassName("o1"))
+                    .setOutputVertexName("vertex2")
+                    .setDataMovementType(
+                        PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
+                    .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                    .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                    .build())
+            .build();
+
+    return dag;
+  }
+
   class VertexEventHanlder implements EventHandler<VertexEvent> {
 
     private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -692,7 +780,8 @@ public class TestVertexRecovery {
 
   /**
    * vertex1 (New) -> StartRecoveryTransition <br>
-   * vertex2 (New) -> StartRecoveryTransition vertex3 (New) -> RecoverTransition
+   * vertex2 (New) -> StartRecoveryTransition <br>
+   * vertex3 (New) -> RecoverTransition
    */
   @Test(timeout = 5000)
   public void testRecovery_RecoveringFromNew() {
@@ -741,7 +830,38 @@ public class TestVertexRecovery {
 
   }
   
-  
+  /**
+   * vertex1 (New) -> StartRecoveryTransition <br>
+   * vertex2 (New) -> RecoveryTransition <br>
+   */
+  @Test
+  public void testMRDAG() {
+    DAGPlan dagPlan = createDAGPlanMR();
+    dag =
+        new DAGImpl(dagId, new Configuration(), dagPlan,
+            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            new Credentials(), new SystemClock(), user,
+            mock(TaskHeartbeatHandler.class), mockAppContext);
+    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+
+    VertexImpl vertex1 = (VertexImpl)dag.getVertex("vertex1");
+    VertexImpl vertex2 = (VertexImpl)dag.getVertex("vertex2");
+    assertEquals(VertexState.NEW, vertex1.getState());
+    assertEquals(VertexState.NEW, vertex1.getState());
+
+    // vertex1 handle RecoveryEvent at the state of NEW
+    // vertex 2 handle SourceVertexRecoveryEvent at the state of NEW
+    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
+        VertexState.RUNNING));
+    dispatcher.await();
+    assertEquals(VertexState.RUNNING, vertex1.getState());
+    assertEquals(1, vertex1.getTasks().size());
+    // verify OutputCommitter is initialized
+    assertOutputCommitters(vertex1);
+    assertEquals(VertexState.RUNNING, vertex2.getState());
+  }
+
   @Test(timeout = 5000)
   public void testRecovery_VertexManagerErrorOnRecovery() {
     VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");


Mime
View raw message