tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized (zjffdu)
Date Tue, 23 Jun 2015 02:40:06 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 018294565 -> 01b94a3d9


TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized (zjffdu)

(cherry picked from commit 142bd428b44019a5199bec82de5d3d017a5489e7)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/01b94a3d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/01b94a3d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/01b94a3d

Branch: refs/heads/branch-0.7
Commit: 01b94a3d90ecddc0bcf64783eb73fb2cac523720
Parents: 0182945
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Tue Jun 23 10:11:34 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Tue Jun 23 10:39:49 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../java/org/apache/tez/dag/app/dag/impl/VertexImpl.java     | 8 ++++++--
 .../java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java | 6 ++++--
 3 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/01b94a3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c6cf731..d672de6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2568. V_INPUT_DATA_INFORMATION may happen after vertex is initialized
   TEZ-2291. TEZ UI: Improper vertex name in tables.
   TEZ-2567. Tez UI: download dag data does not work within ambari
   TEZ-2545. It is not necessary to start the vertex group commit when DAG is in TERMINATING

http://git-wip-us.apache.org/repos/asf/tez/blob/01b94a3d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e909c9f..aa8f593 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -728,6 +728,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   private final List<OutputSpec> additionalOutputSpecs = new ArrayList<OutputSpec>();
   private Set<String> inputsWithInitializers;
   private int numInitializedInputs;
+  @VisibleForTesting
+  int numInitializerCompletionsHandled = 0;
   private boolean startSignalPending = false;
   // We may always store task events in the vertex for scalability
   List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
@@ -3359,6 +3361,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      vertex.numInitializerCompletionsHandled++;
       VertexEventInputDataInformation iEvent = (VertexEventInputDataInformation) event;
       List<TezEvent> inputInfoEvents = iEvent.getEvents();
       try {
@@ -3375,8 +3378,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
       // done. check if we need to do the initialization
       if (vertex.getState() == VertexState.INITIALIZING && vertex.initWaitsForRootInitializers)
{
-        if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()) {
-          // set the wait flag to false if all initializers are done
+        if (vertex.numInitializedInputs == vertex.inputsWithInitializers.size()
+            && vertex.numInitializerCompletionsHandled == vertex.inputsWithInitializers.size())
{
+          // set the wait flag to false if all initializers are done and InputDataInformation
are received from VM
           vertex.initWaitsForRootInitializers = false;
         }
         // initialize vertex if possible and needed

http://git-wip-us.apache.org/repos/asf/tez/blob/01b94a3d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index aeea407..8b2a1b4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -4680,11 +4680,12 @@ public class TestVertexImpl {
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+    Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
     // Complete second initializer
     initializerManager1.completeInputInitialization(1);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
   }
 
   @Test(timeout = 5000)
@@ -4709,11 +4710,12 @@ public class TestVertexImpl {
     initializerManager1.completeInputInitialization(1);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
-
+    Assert.assertEquals(1, v1.numInitializerCompletionsHandled);
     // Complete second initializer which sets parallelism
     initializerManager1.completeInputInitialization(0, 5, v1Hints);
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(2, v1.numInitializerCompletionsHandled);
   }
 
   @Test(timeout = 500000)


Mime
View raw message