tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks (Rajesh Balamohan)
Date Fri, 19 Sep 2014 06:13:05 GMT
Repository: tez
Updated Branches:
  refs/heads/master 625450cf1 -> 82ec16baf


TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks
(Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82ec16ba
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82ec16ba
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82ec16ba

Branch: refs/heads/master
Commit: 82ec16baf890a6914337544ff3f6a7715aa1b86e
Parents: 625450c
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Sep 19 11:42:45 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Sep 19 11:42:45 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/impl/ImmediateStartVertexManager.java   |  9 +++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 45 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5e2c2cd..3ae8f59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,7 @@ ALL CHANGES
   TEZ-1585. Memory leak in tez session mode.
   TEZ-1533. Request Events more often if a complete set of events is received by a task.
   TEZ-1587. Some tez-examples fail in local mode.
+  TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks.
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index ac2b851..773426b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -64,7 +64,14 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
     for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
       String srcVertex = entry.getKey();
       EdgeProperty edgeProp = entry.getValue();
-      srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+      LOG.info("Task count in " + srcVertex + ": " + getContext().getVertexNumTasks(srcVertex));
+      //track vertices with task count > 0
+      if (getContext().getVertexNumTasks(srcVertex) > 0) {
+        srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
+      } else {
+        LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + srcVertex
+            + " as it has got 0 tasks");
+      }
     }
 
     //handle completions

http://git-wip-us.apache.org/repos/asf/tez/blob/82ec16ba/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c003e05..5c00fec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3642,6 +3642,8 @@ public class TestVertexImpl {
      *   M7 --(B)---------------->M5 ---(SG)--> R6
      *                            /
      *   M8---(C)--------------->/
+     *                          /
+     *   M9---(B)--------------> (zero task vertex)
      */
 
     //init M2
@@ -3650,15 +3652,23 @@ public class TestVertexImpl {
     VertexImpl r3 = vertices.get("R3");
     VertexImpl m5 = vertices.get("M5");
     VertexImpl m8 = vertices.get("M8");
+    VertexImpl m9 = vertices.get("M9");
 
     initVertex(m2);
     initVertex(m7);
     initVertex(m8);
+    initVertex(m9);
     assertTrue(m7.getState().equals(VertexState.INITED));
+    assertTrue(m9.getState().equals(VertexState.INITED));
     assertTrue(m5.getState().equals(VertexState.INITED));
     assertTrue(m8.getState().equals(VertexState.INITED));
     assertTrue(m7.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
 
+    //Start M9
+    dispatcher.getEventHandler().handle(new VertexEvent(m9.getVertexId(),
+        VertexEventType.V_START));
+    dispatcher.await();
+
     //Start M2; Let tasks complete in M2; Also let 1 task complete in R3
     dispatcher.getEventHandler().handle(new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
     dispatcher.await();
@@ -3708,6 +3718,8 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(new VertexEvent(m8.getVertexId(),VertexEventType.V_START));
     dispatcher.await();
 
+    assertTrue(m9.getState().equals(VertexState.SUCCEEDED));
+
     //M5 in running state. But tasks should not be scheduled until M8 finishes a task.
     assertTrue(m5.getState().equals(VertexState.RUNNING));
     for(Task task : m5.getTasks().values()) {
@@ -3740,6 +3752,8 @@ public class TestVertexImpl {
      *   M7 --(B)---------------->M5 ---(SG)--> R6
      *                            /
      *   M8---(C)--------------->/
+     *                          /
+     *   M9---(B)--------------> (zero task vertex)
      */
     DAGPlan dag = DAGPlan.newBuilder().setName("TestSamplerDAG")
         .addVertex(VertexPlan.newBuilder()
@@ -3778,6 +3792,24 @@ public class TestVertexImpl {
                 .addOutEdgeId("M8_M5")
                 .build()
         )
+        .addVertex(VertexPlan.newBuilder()
+                .setName("M9")
+                .setProcessorDescriptor(
+                    TezEntityDescriptorProto.newBuilder().setClassName("M9.class"))
+                .setType(PlanVertexType.NORMAL)
+                .addTaskLocationHint(
+                    PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
+                .setTaskConfig(PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(0) //Zero task vertex
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("M9.class")
+                        .build()
+                )
+                .addOutEdgeId("M9_M5")
+                .build()
+        )
          .addVertex(VertexPlan.newBuilder()
                  .setName("R3")
                  .setProcessorDescriptor(
@@ -3815,6 +3847,7 @@ public class TestVertexImpl {
                 .addInEdgeId("R3_M5")
                 .addInEdgeId("M7_M5")
                 .addInEdgeId("M8_M5")
+                .addInEdgeId("M9_M5")
                 .addOutEdgeId("M5_R6")
                 .build()
         )
@@ -3904,6 +3937,18 @@ public class TestVertexImpl {
         )
         .addEdge(
             EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5"))
+                .setInputVertexName("M9")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M9_M5.class"))
+                .setOutputVertexName("M5")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("M9_M5")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
                 .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5"))
                 .setInputVertexName("M8")
                 .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class"))


Mime
View raw message