tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1171. Vertex remains in INITED state if all source vertices start while the vertex was in INITIALIZING state (bikas)
Date Tue, 03 Jun 2014 02:05:16 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master af9bd5bde -> cf073633e


TEZ-1171. Vertex remains in INITED state if all source vertices start while the vertex was
in INITIALIZING state (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cf073633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cf073633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cf073633

Branch: refs/heads/master
Commit: cf073633e89076e005ef80d996d000390820cd8c
Parents: af9bd5b
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Jun 2 19:04:52 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Jun 2 19:04:52 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 39 ++++++++++++-----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 45 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cf073633/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6c9eba1..21c3cc1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2750,19 +2750,31 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.numStartedSourceVertices++;
       LOG.info("Source vertex started: " + startEvent.getSourceVertexId() +
           " for vertex: " + vertex.getVertexId() + " numStartedSources: " + 
-          vertex.numStartedSourceVertices);
+          vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size());
       vertex.startIfPossible();
     }
   }
 
+  boolean hasSourceVertexDependency() {
+    return (sourceVertices != null && sourceVertices.size() > 0);
+  }
+  
   boolean canStartVertex() {
-    if (getState() != VertexState.INITED) {
-      LOG.info("Cannot start vertex. Not in inited state. " + logIdentifier + 
-          " . VertesState: " + getState());
-      return false;
-    }
     if ((sourceVertices == null || numStartedSourceVertices == sourceVertices.size())
         && uninitializedEdges.isEmpty()) {
+      // vertex meets external start dependency conditions
+      if (hasSourceVertexDependency()) {
+        // this vertex is not going to receive a direct external start event from DAG
+        startSignalPending = true;
+      }
+      if (getState() != VertexState.INITED) {
+        // vertex itself is not ready to start. External dependencies have already
+        // notified us. So save that notification so that we can start when we 
+        // ourselves are ready internally.
+        LOG.info("Cannot start vertex. Not in inited state. " + logIdentifier + 
+            " . VertesState: " + getState() + " numTasks: " + numTasks);
+        return false;
+      }
       // vertex is inited and all dependencies are ready. Inited vertex means 
       // parallelism must be set already
       Preconditions
@@ -2782,11 +2794,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (canStartVertex()) {
       Preconditions.checkState(getState() == VertexState.INITED, 
           "Vertex must be inited " + logIdentifier);
-      LOG.info("Starting vertex: " + getVertexId() +
-               " with name: " + getName() +
-               " with distanceFromRoot: " + distanceFromRoot );
-      eventHandler.handle(new VertexEvent(vertexId,
-          VertexEventType.V_START));
+      if (startSignalPending) {
+        LOG.info("Starting vertex: " + getVertexId() +
+                 " with name: " + getName() +
+                 " with distanceFromRoot: " + distanceFromRoot );
+        eventHandler.handle(new VertexEvent(vertexId,
+            VertexEventType.V_START));
+      }
     }
   }
 
@@ -2821,6 +2835,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       // this is to handle the initial vertices who are directly sent a V_START
       // from the DAG. They may have uninitialized edges that may be initialized
       // when the downstream vertices initialize
+      LOG.info("Received START event. Saving notification so that we can start " +
+      		"when other requirements are met for vertex: " + logIdentifier);
+      startSignalPending = true;
       return VertexState.INITED;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cf073633/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 84fec0b..06b9b47 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1145,8 +1145,39 @@ public class TestVertexImpl {
                 )
                 .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
                     .setClassName(VertexManagerPluginForTest.class.getName()))
+                .addOutEdgeId("2_3")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("v3")
+                .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(-1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("A.class")
+                        .build()
+                )
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName(VertexManagerPluginForTest.class.getName()))
+                .addInEdgeId("2_3")
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("2_3"))
+                .setInputVertexName("v2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("2_3.class"))
+                .setOutputVertexName("v3")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("2_3")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
                 .build()
-
         ).build();
     
     return dag;
@@ -2240,7 +2271,7 @@ public class TestVertexImpl {
   }
   
   @SuppressWarnings("unchecked")
-  @Test//(timeout = 5000)
+  @Test(timeout = 5000)
   public void testVertexInitWithCustomVertexManager() {
     setupPreDagCreation();
     dagPlan = createDAGWithCustomVertexManager();
@@ -2249,6 +2280,7 @@ public class TestVertexImpl {
     int numTasks = 3;
     VertexImpl v1 = vertices.get("v1");
     VertexImpl v2 = vertices.get("v2");
+    VertexImpl v3 = vertices.get("v3");
     initVertex(v1);
     initVertex(v2);
     dispatcher.await();
@@ -2257,6 +2289,8 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     Assert.assertEquals(-1, v2.getTotalTasks());
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    Assert.assertEquals(-1, v3.getTotalTasks());
+    Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
     // vertex should not start since parallelism is not set
     dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
     dispatcher.await();
@@ -2276,6 +2310,13 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(), VertexEventType.V_START));
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v2.getState());
+    // v3 still initializing with source vertex started. So should start running
+    // once num tasks is defined
+    Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+    v3.setParallelism(numTasks, null, null);
+    dispatcher.await();
+    Assert.assertEquals(numTasks, v3.getTotalTasks());
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
   }
 
   @Test(timeout = 5000)


Mime
View raw message