tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas) (cherry picked from commit b804b8f03e9d7934202f3838841fe1abb416f480)
Date Thu, 05 Feb 2015 04:08:47 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 50b24c249 -> 363249d64


TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly (bikas)
(cherry picked from commit b804b8f03e9d7934202f3838841fe1abb416f480)

Conflicts: Resolved by keeping incoming change
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/363249d6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/363249d6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/363249d6

Branch: refs/heads/branch-0.5
Commit: 363249d6455d34605c361480685d41165b9118b1
Parents: 50b24c2
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Feb 4 19:48:19 2015 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Feb 4 20:07:16 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../event/VertexEventOneToOneSourceSplit.java   |  50 ---------
 .../tez/dag/app/dag/event/VertexEventType.java  |   1 -
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 111 +------------------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  39 +++----
 .../vertexmanager/InputReadyVertexManager.java  |  80 ++++++++-----
 .../TestInputReadyVertexManager.java            |  64 ++++++-----
 7 files changed, 108 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 143b822..5a9e06d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2020. For 1-1 edge vertex configured event may be sent incorrectly
   TEZ-2015. VertexImpl.doneReconfiguringVertex() should check other criteria
   before sending notification
   TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
deleted file mode 100644
index a7e580e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezVertexID;
-
-public class VertexEventOneToOneSourceSplit extends VertexEvent {
-  final int numTasks;
-  final TezVertexID originalSplitVertex;
-  final TezVertexID senderVertex;
-  
-  public VertexEventOneToOneSourceSplit(TezVertexID vertexId,
-      TezVertexID senderVertex,
-      TezVertexID originalSplitVertex,
-      int numTasks) {
-    super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT);
-    this.numTasks = numTasks;
-    this.senderVertex = senderVertex;
-    this.originalSplitVertex = originalSplitVertex;
-  }
-  
-  public int getNumTasks() {
-    return numTasks;
-  }
-  
-  public TezVertexID getOriginalSplitSource() {
-    return originalSplitVertex;
-  }
-  
-  public TezVertexID getSenderVertex() {
-    return senderVertex;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index b4f7e29..5c36348 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -45,7 +45,6 @@ public enum VertexEventType {
   V_MANAGER_USER_CODE_ERROR,
   
   V_ROUTE_EVENT,
-  V_ONE_TO_ONE_SOURCE_SPLIT,
   
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7684afc..8e47d4b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -115,7 +115,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -336,10 +335,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
           .addTransition(VertexState.INITIALIZING,
-              EnumSet.of(VertexState.INITIALIZING),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
-          .addTransition(VertexState.INITIALIZING,
               EnumSet.of(VertexState.INITED, VertexState.FAILED),
               VertexEventType.V_READY_TO_INIT,
               new VertexInitializedTransition())
@@ -393,10 +388,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               new SourceVertexStartedTransition())
           .addTransition(VertexState.INITED,
               EnumSet.of(VertexState.INITED),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
-          .addTransition(VertexState.INITED,
-              EnumSet.of(VertexState.INITED),
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition(VertexState.INITED,
@@ -436,10 +427,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.ERROR),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition(VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING),
-              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
-              new OneToOneSourceSplitTransition())
           .addTransition(VertexState.RUNNING, VertexState.TERMINATING,
               VertexEventType.V_TERMINATE,
               new VertexKilledTransition())
@@ -538,7 +525,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_NULL_EDGE_INITIALIZED,
@@ -561,7 +547,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -582,7 +567,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_MANAGER_USER_CODE_ERROR,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_INTERNAL_ERROR,
@@ -684,7 +668,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final UserGroupInformation dagUgi;
 
   private boolean parallelismSet = false;
-  private TezVertexID originalOneToOneSplitSource = null;
 
   private AtomicBoolean committed = new AtomicBoolean(false);
   private AtomicBoolean aborted = new AtomicBoolean(false);
@@ -1466,21 +1449,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
       }
 
-      for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
-        Edge edge = entry.getValue();
-        if (edge.getEdgeProperty().getDataMovementType()
-            == DataMovementType.ONE_TO_ONE) {
-          // inform these target vertices that we have changed parallelism
-          VertexEventOneToOneSourceSplit event =
-              new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
-                  getVertexId(),
-                  ((originalOneToOneSplitSource!=null) ?
-                      originalOneToOneSplitSource : getVertexId()),
-                  numTasks);
-          getEventHandler().handle(event);
-        }
-      }
-
     } finally {
       writeLock.unlock();
     }
@@ -1501,21 +1469,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   @Override
   public void vertexReconfigurationPlanned() {
-    vertexReconfigurationPlanned(false);
-  }
-  
-  public void vertexReconfigurationPlanned(boolean testOverride) {
     writeLock.lock();
     try {
-      if (testOverride) {
-        Preconditions.checkState(vmIsInitialized.get() && completelyConfiguredSent.get(),
-            "test should override only failed cases");
-      } else {
-        Preconditions.checkState(!vmIsInitialized.get(),
-            "context.vertexReconfigurationPlanned() cannot be called after initialize()");
-        Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned()
"
-            + " cannot be invoked after the vertex has been configured.");
-      }
+      Preconditions.checkState(!vmIsInitialized.get(),
+          "context.vertexReconfigurationPlanned() cannot be called after initialize()");
+      Preconditions.checkState(!completelyConfiguredSent.get(), "vertexReconfigurationPlanned()
"
+          + " cannot be invoked after the vertex has been configured.");
       this.vertexToBeReconfiguredByManager = true;
     } finally {
       writeLock.unlock();
@@ -3107,68 +3066,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  public static class OneToOneSourceSplitTransition implements
-    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexEventOneToOneSourceSplit splitEvent =
-          (VertexEventOneToOneSourceSplit)event;
-      TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
-
-      if (vertex.originalOneToOneSplitSource != null) {
-        VertexState state = vertex.getState();
-        Preconditions
-            .checkState(
-                (state == VertexState.INITIALIZING
-                    || state == VertexState.INITED || state == VertexState.RUNNING),
-                " Unexpected 1-1 split for vertex " + vertex.getVertexId()
-                    + " in state " + vertex.getState() + " . Split in vertex "
-                    + originalSplitSource + " sent by vertex "
-                    + splitEvent.getSenderVertex() + " numTasks "
-                    + splitEvent.getNumTasks());
-        if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
-          // ignore another split event that may have come from a different
-          // path in the DAG. We have already split because of that source
-          LOG.info("Ignoring split of vertex " + vertex.getVertexId() +
-              " because of split in vertex " + originalSplitSource +
-              " sent by vertex " + splitEvent.getSenderVertex() +
-              " numTasks " + splitEvent.getNumTasks());
-          return state;
-        }
-        // cannot split from multiple sources
-        throw new TezUncheckedException("Vertex: " + vertex.getVertexId() +
-            " asked to split by: " + originalSplitSource +
-            " but was already split by:" + vertex.originalOneToOneSplitSource);
-      }
-
-      LOG.info("Splitting vertex " + vertex.getVertexId() +
-          " because of split in vertex " + originalSplitSource +
-          " sent by vertex " + splitEvent.getSenderVertex() +
-          " numTasks " + splitEvent.getNumTasks());
-      vertex.originalOneToOneSplitSource = originalSplitSource;
-      try {
-        vertex.setParallelism(splitEvent.getNumTasks(), null, null, null, false);
-      } catch (Exception e) {
-        // ingore this exception, should not happen
-        LOG.error("Unexpected exception, Just set Parallelims to a specified value, not involve
EdgeManager,"
-            + "exception should not happen here", e);
-      }
-      if (vertex.getState() == VertexState.RUNNING ||
-          vertex.getState() == VertexState.INITED) {
-        return vertex.getState();
-      } else {
-        Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
-            " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
-                " in state " + vertex.getState() +
-                " . Split in vertex " + originalSplitSource +
-                " sent by vertex " + splitEvent.getSenderVertex() +
-                " numTasks " + splitEvent.getNumTasks());
-        return vertex.getState();
-      }
-    }
-  }
-
   // Temporary to maintain topological order while starting vertices. Not useful
   // since there's not much difference between the INIT and RUNNING states.
   public static class SourceVertexStartedTransition implements

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 029a8bb..7f9159b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2430,14 +2430,14 @@ public class TestVertexImpl {
   
   @Test(timeout = 5000)
   public void testVertexSetParallelism() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
     TezTaskID firstTask = tasks.keySet().iterator().next();
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2460,13 +2460,13 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexSetParallelismIncreaseException() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2483,13 +2483,13 @@ public class TestVertexImpl {
   
   @Test(timeout = 5000)
   public void testVertexSetParallelismMultipleException() throws Exception {
-    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
 
-    v3.vertexReconfigurationPlanned(true);
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
@@ -2544,6 +2544,8 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testSetCustomEdgeManager() throws Exception {
+    VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5 dest)
+    v5.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     Edge edge = edges.get("e4");
     EdgeManagerPlugin em = edge.getEdgeManager();
@@ -2557,10 +2559,7 @@ public class TestVertexImpl {
     edgeManagerDescriptor.setUserPayload(userPayload);
 
     Vertex v3 = vertices.get("vertex3");
-    VertexImpl v5 = vertices.get("vertex5"); // Vertex5 linked to v3 (v3 src, v5
-                                         // dest)
 
-    v5.vertexReconfigurationPlanned(true);
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
     v5.setParallelism(v5.getTotalTasks() - 1, null, edgeManagerDescriptors, null, true);
// Must decrease.
@@ -3288,25 +3287,23 @@ public class TestVertexImpl {
       Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
     }
     Assert.assertEquals(true, initializerManager1.hasShutDown);
-    
+
+    startVertex(v1);
+
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
-    Assert.assertEquals(VertexState.INITED, vertices.get("vertex5").getState());
-    // v5, v6 still initializing since edge is null
-    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    // v4, v6 still initializing since edge is null
     Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
     
-    startVertex(v1);
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex5").getState());
-    // v5, v6 still initializing since edge is null
-    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    // v4, v6 still initializing since edge is null
     Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.INITIALIZING, vertices.get("vertex6").getState());
     
     mockEdgeManagerDescriptor =
         EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
@@ -3315,7 +3312,7 @@ public class TestVertexImpl {
     e.setCustomEdgeManager(mockEdgeManagerDescriptor);
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
-    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex6").getState());
   }
   
   @Test(timeout = 5000)
@@ -3326,6 +3323,7 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
+    v1.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
@@ -3333,7 +3331,6 @@ public class TestVertexImpl {
         VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()),
         v1, appContext, mock(StateChangeNotifier.class));
     v1.vertexManager.initialize();
-    v1.vertexReconfigurationPlanned(true);
     startVertex(v1);
     
     Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
@@ -3365,6 +3362,7 @@ public class TestVertexImpl {
     dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
     setupPostDagCreation();
     VertexImpl v1 = vertices.get("vertex1");
+    v1.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     
     // fudge vertex manager so that tasks dont start running
@@ -3378,7 +3376,6 @@ public class TestVertexImpl {
     Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
     // change parallelism
     int newNumTasks = 3;
-    v1.vertexReconfigurationPlanned(true);
     v1.setParallelism(newNumTasks, null, null, null, true);
     v1.doneReconfiguringVertex();
     dispatcher.await();

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 0e3a3ce..4415fe4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,8 +56,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   int oneToOneSrcTasksDoneCount[];
   TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
-  int numSignalsToWaitFor;
+  int numConfiguredSources;
   Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+  AtomicBoolean configured;
+  AtomicBoolean started;
 
   public InputReadyVertexManager(VertexManagerPluginContext context) {
     super(context);
@@ -76,13 +79,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
   }
   
-  void start() {
-    if (!ready()) {
-      return;
-    }
+  private void configure() {
+    Preconditions.checkState(!configured.get(), "Vertex: " + getContext().getVertexName());
     int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
-    taskIsStarted = new boolean[numManagedTasks];
 
     // find out about all input edge types. If there is a custom edge then 
     // TODO Until TEZ-1013 we cannot handle custom input formats
@@ -116,32 +116,51 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
     
     if (numOneToOneEdges > 0) {
+      Preconditions
+          .checkState(oneToOneSrcTaskCount >= 0, "Vertex: " + getContext().getVertexName());
       if (oneToOneSrcTaskCount != numManagedTasks) {
-        throw new TezUncheckedException(
-            "Managed task number must equal 1-1 source task number");
+        numManagedTasks = oneToOneSrcTaskCount;
+        // must change parallelism to make them the same
+        LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + 
+            " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices.");
+        getContext().setVertexParallelism(oneToOneSrcTaskCount, null, null, null);
       }
       oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
     }
+    
+    Preconditions.checkState(numManagedTasks >=0, "Vertex: " + getContext().getVertexName());
+    taskIsStarted = new boolean[numManagedTasks];
 
-    for (Map.Entry<String, Collection<Integer>> entry :  pendingCompletions.asMap().entrySet())
{
-      for (Integer task : entry.getValue()) {
-        handleSourceTaskFinished(entry.getKey(), task);
-      }
-    }
+    // allow scheduling
+    configured.set(true);
+    getContext().doneReconfiguringVertex();
+    trySchedulingPendingCompletions();
   }
   
-  boolean ready() {
-    int target = getContext().getInputVertexEdgeProperties().size() + 1;
-    Preconditions.checkState(numSignalsToWaitFor <= target);
-    return (numSignalsToWaitFor == target);
+  private boolean readyToSchedule() {
+    return (configured.get() && started.get());
+  }
+  
+  private void trySchedulingPendingCompletions() {
+    if (readyToSchedule() && !pendingCompletions.isEmpty()) {
+      for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet())
{
+        for (Integer i : entry.getValue()) {
+          onSourceTaskCompleted(entry.getKey(), i);
+        }
+      }
+    }
   }
   
   @Override
   public void initialize() {
+    // this will prevent vertex from starting until we notify we are done
+    getContext().vertexReconfigurationPlanned();
     Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
     // wait for sources and self to start
-    numSignalsToWaitFor = 0;
+    numConfiguredSources = 0;
+    configured = new AtomicBoolean(false);
+    started = new AtomicBoolean(false);
     for (String entry : edges.keySet()) {
       getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
     }
@@ -149,24 +168,33 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   
   @Override
   public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception
{
-    numSignalsToWaitFor++;
-    LOG.info("Received configured signal from: " + stateUpdate.getVertexName() + 
-        " numConfiguredSources: " + numSignalsToWaitFor);
-    start();
+    numConfiguredSources++;
+    int target = getContext().getInputVertexEdgeProperties().size();
+    LOG.info("For vertex: " + getContext().getVertexName() + "Received configured signal
from: "
+        + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+        + " needed: " + target);
+    Preconditions.checkState(numConfiguredSources <= target, "Vertex: " + getContext().getVertexName());
+    if (numConfiguredSources == target) {
+      configure();
+    }
   }
 
   @Override
   public synchronized void onVertexStarted(Map<String, List<Integer>> completions)
{
     for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
       pendingCompletions.putAll(entry.getKey(), entry.getValue());
-    }
-    numSignalsToWaitFor++;
-    start();
+    }    
+
+    // allow scheduling
+    started.set(true);
+    
+    trySchedulingPendingCompletions();
   }
 
   @Override
   public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-    if (ready()) {
+    if (readyToSchedule()) {
+      // configured and started. try to schedule
       handleSourceTaskFinished(srcVertexName, taskId);
     } else {
       pendingCompletions.put(srcVertexName, taskId);

http://git-wip-us.apache.org/repos/asf/tez/blob/363249d6/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index 9a83a51..8de747d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -82,8 +83,11 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first source vertex configured
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
+    // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
     // then own vertex started
     manager.onVertexStarted(initialCompletions);
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
@@ -119,10 +123,12 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first own vertex started
-    manager.onVertexStarted(initialCompletions);
-    // then source vertex configured
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
+    // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onVertexStarted(initialCompletions);
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -174,17 +180,19 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
-    // first own vertex started
-    manager.onVertexStarted(initialCompletions);
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
     verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    // ok to have source task complete before anything else
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    // first own vertex started
+    manager.onVertexStarted(initialCompletions);
+    // no scheduling as we are not configured yet
     verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
     // then source vertex configured. now we start
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).doneReconfiguringVertex();
+    
     verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
-    Assert.assertEquals(2, requestCaptor.getAllValues().size());
-    Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
     manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
     verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
@@ -247,58 +255,48 @@ public class TestInputReadyVertexManager {
     
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     
-    // 1-1 sources do not match managed tasks before vertex started
+    // 1-1 sources do not match managed tasks. setParallelism called to make them match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(1)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    try {
-      manager.onVertexStarted(initialCompletions);
-      Assert.assertTrue("Should have exception", false);
-    } catch (TezUncheckedException e) {
-      e.getMessage().contains("Managed task number must equal 1-1 source");
-    }
-
-    // 1-1 sources do not match managed tasks after vertex started
-    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-    manager = new InputReadyVertexManager(mockContext);
-    manager.initialize();
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).setVertexParallelism(3, null, null, null);
+    verify(mockContext, times(1)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
-    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-    try {
-      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-      Assert.assertTrue("Should have exception", false);
-    } catch (TezUncheckedException e) {
-      e.getMessage().contains("Managed task number must equal 1-1 source");
-    }
     
     // 1-1 sources do not match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(2)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
-      manager.onVertexStarted(initialCompletions);
+      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
       Assert.assertTrue("Should have exception", false);
     } catch (TezUncheckedException e) {
       e.getMessage().contains("1-1 source vertices must have identical concurrency");
     }
+    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+        anyMap(), anyMap()); // not invoked
+    
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     
     initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
     initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
-    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    verify(mockContext, times(3)).vertexReconfigurationPlanned();
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    verify(mockContext, times(1)).setVertexParallelism(anyInt(), (VertexLocationHint) any(),
+        anyMap(), anyMap()); // not invoked
+    verify(mockContext, times(2)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 0);


Mime
View raw message