tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-364. Make VertexScheduler and abstract class. Rename to VertexManager. (bikas)
Date Sun, 05 Jan 2014 00:49:57 GMT
Updated Branches:
  refs/heads/master 973af812e -> 25615ff32


TEZ-364. Make VertexScheduler and abstract class. Rename to VertexManager. (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/25615ff3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/25615ff3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/25615ff3

Branch: refs/heads/master
Commit: 25615ff32e52505ea3006161162fef1fc35b30bb
Parents: 973af81
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Jan 4 16:49:40 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Jan 4 16:49:40 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/VertexManager.java   |  40 ++
 .../apache/tez/dag/app/dag/VertexScheduler.java |  37 --
 .../dag/impl/ImmediateStartVertexManager.java   |  63 +++
 .../dag/impl/ImmediateStartVertexScheduler.java |  63 ---
 .../app/dag/impl/RootInputVertexManager.java    |   4 +-
 .../dag/app/dag/impl/ShuffleVertexManager.java  |   4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  26 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  18 +-
 .../tez/dag/app/dag/impl/TestVertexManager.java | 491 +++++++++++++++++++
 .../dag/app/dag/impl/TestVertexScheduler.java   | 491 -------------------
 10 files changed, 620 insertions(+), 617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexManager.java
new file mode 100644
index 0000000..80b6f69
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexManager.java
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+public abstract class VertexManager {
+  public abstract void initialize(Configuration conf);
+
+  public abstract void onVertexStarted(List<TezTaskAttemptID> completions);
+
+  public abstract void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+
+  public abstract void onVertexManagerEventReceived(VertexManagerEvent vmEvent);
+
+  public abstract void onRootVertexInitialized(String inputName, 
+      InputDescriptor inputDescriptor, List<Event> events);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
deleted file mode 100644
index 812e577..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
-
-
-// Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
-public interface VertexScheduler {
-  void initialize(Configuration conf);
-  void onVertexStarted(List<TezTaskAttemptID> completions);
-  void onSourceTaskCompleted(TezTaskAttemptID attemptId);
-  void onVertexManagerEventReceived(VertexManagerEvent vmEvent);
-  void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
new file mode 100644
index 0000000..9c6b0bf
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexManager;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+/**
+ * Starts all tasks immediately on vertex start
+ */
+public class ImmediateStartVertexManager extends VertexManager {
+  final Vertex managedVertex;
+  
+  ImmediateStartVertexManager(Vertex vertex) {
+    managedVertex = vertex;
+  }
+  
+  @Override
+  public void onVertexStarted(List<TezTaskAttemptID> completions) {
+    managedVertex.scheduleTasks(managedVertex.getTasks().keySet());
+  }
+
+  @Override
+  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
+  }
+
+  @Override
+  public void initialize(Configuration conf) {    
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
deleted file mode 100644
index 85017b4..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.impl;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexScheduler;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
-
-/**
- * Starts all tasks immediately on vertex start
- */
-public class ImmediateStartVertexScheduler implements VertexScheduler {
-  final Vertex managedVertex;
-  
-  ImmediateStartVertexScheduler(Vertex vertex) {
-    managedVertex = vertex;
-  }
-  
-  @Override
-  public void onVertexStarted(List<TezTaskAttemptID> completions) {
-    managedVertex.scheduleTasks(managedVertex.getTasks().keySet());
-  }
-
-  @Override
-  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
-  }
-
-  @Override
-  public void initialize(Configuration conf) {    
-  }
-
-  @Override
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
-  }
-
-  @Override
-  public void onRootVertexInitialized(String inputName,
-      InputDescriptor inputDescriptor, List<Event> events) {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 6ffe581..ca27c87 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.app.dag.VertexManager;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -44,7 +44,7 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-public class RootInputVertexManager implements VertexScheduler {
+public class RootInputVertexManager extends VertexManager {
   
   private final Vertex managedVertex;
   private final EventMetaData sourceInfo;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index c83273a..f9e6a89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -34,7 +34,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.app.dag.VertexManager;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -52,7 +52,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
  * <code>slowStartMinSrcCompletionFraction</code> and schedules all tasks 
  *  when <code>slowStartMaxSrcCompletionFraction</code> is reached
  */
-public class ShuffleVertexManager implements VertexScheduler {
+public class ShuffleVertexManager extends VertexManager {
   
   private static final Log LOG = 
                    LogFactory.getLog(ShuffleVertexManager.class);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8d45da1..2672609 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.app.dag.VertexManager;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -456,7 +456,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private RootInputInitializerRunner rootInputInitializer;
 
-  private VertexScheduler vertexScheduler;
+  private VertexManager vertexManager;
 
   private boolean parallelismSet = false;
   private TezVertexID originalOneToOneSplitSource = null;
@@ -1299,21 +1299,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
 
       if (hasBipartite) {
-        // setup vertex scheduler
+        // setup vertex manager
         // TODO this needs to consider data size and perhaps API.
         // Currently implicitly BIPARTITE is the only edge type
         LOG.info("Setting vertexManager to ShuffleVertexManager for " + vertex.logIdentifier);
-        vertex.vertexScheduler = new ShuffleVertexManager(vertex);
+        vertex.vertexManager = new ShuffleVertexManager(vertex);
       } else if (vertex.inputsWithInitializers != null) {
         LOG.info("Setting vertexManager to RootInputVertexManager for " + vertex.logIdentifier);
-        vertex.vertexScheduler = new RootInputVertexManager(vertex,
+        vertex.vertexManager = new RootInputVertexManager(vertex,
             vertex.eventHandler);
       } else {
         // schedule all tasks upon vertex start
         LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + vertex.logIdentifier);
-        vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
+        vertex.vertexManager = new ImmediateStartVertexManager(vertex);
       }
-      vertex.vertexScheduler.initialize(vertex.conf);
+      vertex.vertexManager.initialize(vertex.conf);
 
       // Setup tasks early if possible. If the VertexManager is not being used
       // to set parallelism, sending events to Tasks is safe (and less confusing
@@ -1437,7 +1437,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
 
-      vertex.vertexScheduler.onRootVertexInitialized(
+      vertex.vertexManager.onRootVertexInitialized(
           liInitEvent.getInputName(),
           vertex.getAdditionalInputs().get(liInitEvent.getInputName())
               .getDescriptor(), liInitEvent.getEvents());
@@ -1562,7 +1562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private void startVertex() {
     startedTime = clock.getTime();
-    vertexScheduler.onVertexStarted(pendingReportedSrcCompletions);
+    vertexManager.onVertexStarted(pendingReportedSrcCompletions);
     pendingReportedSrcCompletions.clear();
     logJobHistoryVertexStartedEvent();
 
@@ -1720,7 +1720,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .getTaskAttemptState())) {
         vertex.numSuccessSourceAttemptCompletions++;
         if (vertex.getState() == VertexState.RUNNING) {
-          vertex.vertexScheduler.onSourceTaskCompleted(completionEvent
+          vertex.vertexManager.onSourceTaskCompleted(completionEvent
               .getTaskAttemptId());
         } else {
           vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
@@ -1938,7 +1938,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
           Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
           if (target == vertex) {
-            vertex.vertexScheduler.onVertexManagerEventReceived(vmEvent);
+            vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
           } else {
             vertex.eventHandler.handle(new VertexEventRouteEvent(target
                 .getVertexId(), Collections.singletonList(tezEvent)));
@@ -2196,8 +2196,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @VisibleForTesting
-  VertexScheduler getVertexScheduler() {
-    return this.vertexScheduler;
+  VertexManager getVertexManager() {
+    return this.vertexManager;
   }
 
   private static void logLocationHints(VertexLocationHint locationHint) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 61c9745..d0c26d4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1391,14 +1391,14 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexSchedulerInit() {
+  public void testVertexManagerInit() {
     initAllVertices(VertexState.INITED);
     VertexImpl v2 = vertices.get("vertex2");
-    Assert.assertTrue(v2.getVertexScheduler()
-        instanceof ImmediateStartVertexScheduler);
+    Assert.assertTrue(v2.getVertexManager()
+        instanceof ImmediateStartVertexManager);
 
     VertexImpl v6 = vertices.get("vertex6");
-    Assert.assertTrue(v6.getVertexScheduler()
+    Assert.assertTrue(v6.getVertexManager()
         instanceof ShuffleVertexManager);
   }
 
@@ -1776,7 +1776,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(numTasks, v1.getTotalTasks());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
-        .getVertexScheduler().getClass().getName());
+        .getVertexManager().getClass().getName());
     Assert.assertEquals(v1Hints, v1.getVertexLocationHint().getTaskLocationHints());
     Assert.assertEquals(true, runner1.hasShutDown);
     
@@ -1830,7 +1830,7 @@ public class TestVertexImpl {
 
     Assert.assertEquals(VertexState.FAILED, v1.getState());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
-        .getVertexScheduler().getClass().getName());
+        .getVertexManager().getClass().getName());
     Assert.assertEquals(true, runner1.hasShutDown);
     
     VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
@@ -1840,7 +1840,7 @@ public class TestVertexImpl {
     
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
-        .getVertexScheduler().getClass().getName());
+        .getVertexManager().getClass().getName());
     Assert.assertEquals(true, runner2.hasShutDown);
   }
   
@@ -1865,7 +1865,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
-        .getVertexScheduler().getClass().getName());
+        .getVertexManager().getClass().getName());
     Assert.assertEquals(v1Hints, v1.getVertexLocationHint().getTaskLocationHints());
     Assert.assertEquals(true, runner1.hasShutDown);
     
@@ -1878,7 +1878,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
-        .getVertexScheduler().getClass().getName());
+        .getVertexManager().getClass().getName());
     Assert.assertEquals(v2Hints, v2.getVertexLocationHint().getTaskLocationHints());
     Assert.assertEquals(true, runner2.hasShutDown);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
new file mode 100644
index 0000000..916bf27
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+import static org.mockito.Mockito.*;
+
+public class TestVertexManager {
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 5000)
+  public void testShuffleVertexManagerAutoParallelism() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        true);
+    conf.setLong(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
+    ShuffleVertexManager manager = null;
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    HashMap<Vertex, Edge> mockInputVertices = 
+        new HashMap<Vertex, Edge>();
+    Vertex mockSrcVertex1 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
+    Vertex mockSrcVertex2 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
+    EdgeProperty eProp2 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
+    Vertex mockSrcVertex3 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
+    EdgeProperty eProp3 = new EdgeProperty(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED, 
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
+    
+    Vertex mockManagedVertex = mock(Vertex.class);
+    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 4);
+    when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
+    when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
+    
+    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
+
+    // check initialization
+    manager = createManager(conf, mockManagedVertex, 0.1f, 0.1f);
+    Assert.assertTrue(manager.bipartiteSources.size() == 2);
+    Assert.assertTrue(manager.bipartiteSources.containsKey(mockSrcVertexId1));
+    Assert.assertTrue(manager.bipartiteSources.containsKey(mockSrcVertexId2));
+    
+    final HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
+    final TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
+    managedTasks.put(mockTaskId1, null);
+    final TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
+    managedTasks.put(mockTaskId2, null);
+    final TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
+    managedTasks.put(mockTaskId3, null);
+    final TezTaskID mockTaskId4 = TezTaskID.getInstance(mockManagedVertexId, 3);
+    managedTasks.put(mockTaskId4, null);
+    
+    when(mockManagedVertex.getTotalTasks()).thenReturn(managedTasks.size());
+    when(mockManagedVertex.getTasks()).thenReturn(managedTasks);
+    
+    final HashSet<TezTaskID> scheduledTasks = new HashSet<TezTaskID>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          scheduledTasks.clear();
+          scheduledTasks.addAll((Collection<TezTaskID>)args[0]); 
+          return null;
+      }}).when(mockManagedVertex).scheduleTasks(anyCollection());
+    
+    final Map<Vertex, EdgeManager> newEdgeManagers = new HashMap<Vertex, EdgeManager>();
+    
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          managedTasks.remove(mockTaskId3);
+          managedTasks.remove(mockTaskId4);
+          newEdgeManagers.clear();
+          newEdgeManagers.putAll((Map<Vertex, EdgeManager>)invocation.getArguments()[1]);
+          return null;
+      }}).when(mockManagedVertex).setParallelism(eq(2), anyMap());
+    
+    // source vertices have 0 tasks. immediate start of all managed tasks
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(0);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    scheduledTasks.clear();
+    
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(2);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
+
+    TezTaskAttemptID mockSrcAttemptId11 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId12 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
+    TezTaskAttemptID mockSrcAttemptId21 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId31 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
+
+    byte[] payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
+    VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
+    // parallelism not change due to large data size
+    manager = createManager(conf, mockManagedVertex, 0.1f, 0.1f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    // managedVertex tasks reduced
+    verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
+    Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
+    Assert.assertEquals(4, scheduledTasks.size());
+    Assert.assertEquals(1, manager.numSourceTasksCompleted); // TODO
+    Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
+    
+    
+    // parallelism changed due to small data size
+    scheduledTasks.clear();
+    payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
+    vmEvent = new VertexManagerEvent("Vertex", payload);
+    
+    manager = createManager(conf, mockManagedVertex, 0.5f, 0.5f);
+    manager.onVertexStarted(null);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(4, manager.numSourceTasks);
+    // task completion from non-bipartite stage does nothing
+    manager.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(4, manager.numSourceTasks);
+    Assert.assertEquals(0, manager.numSourceTasksCompleted);
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(1, manager.numSourceTasksCompleted);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
+    // ignore duplicate completion
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(1, manager.numSourceTasksCompleted);
+    Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
+    
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    // managedVertex tasks reduced
+    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
+    Assert.assertEquals(2, newEdgeManagers.size());
+    // TODO improve tests for parallelism
+    Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
+    Assert.assertEquals(2, scheduledTasks.size());
+    Assert.assertTrue(scheduledTasks.contains(mockTaskId1));
+    Assert.assertTrue(scheduledTasks.contains(mockTaskId2));
+    Assert.assertEquals(2, manager.numSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize);
+    
+    // more completions dont cause recalculation of parallelism
+    manager.onSourceTaskCompleted(mockSrcAttemptId21);
+    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
+    Assert.assertEquals(2, newEdgeManagers.size());
+    
+    EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
+    List<Integer> targets = Lists.newArrayList();
+    DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
+    edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
+    Assert.assertEquals(3, dmEvent.getTargetIndex());
+    Assert.assertEquals(0, targets.get(0).intValue());
+    targets.clear();
+    dmEvent = new DataMovementEvent(2, new byte[0]);
+    edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
+    Assert.assertEquals(0, dmEvent.getTargetIndex());
+    Assert.assertEquals(1, targets.get(0).intValue());    
+  }
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 5000)
+  public void testShuffleVertexManagerSlowStart() {
+    Configuration conf = new Configuration();
+    ShuffleVertexManager manager = null;
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
+    HashMap<Vertex, Edge> mockInputVertices = 
+        new HashMap<Vertex, Edge>();
+    Vertex mockSrcVertex1 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED, 
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
+    Vertex mockSrcVertex2 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
+    EdgeProperty eProp2 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
+    Vertex mockSrcVertex3 = mock(Vertex.class);
+    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
+    EdgeProperty eProp3 = new EdgeProperty(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED, 
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
+    
+    Vertex mockManagedVertex = mock(Vertex.class);
+    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 3);
+    when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
+    when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
+    
+    // fail if there is no bipartite src vertex
+    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
+    try {
+      manager = createManager(conf, mockManagedVertex, 0.1f, 0.1f);
+     Assert.assertFalse(true);
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Atleast 1 bipartite source should exist"));
+    }
+    
+    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
+    
+    // check initialization
+    manager = createManager(conf, mockManagedVertex, 0.1f, 0.1f);
+    Assert.assertTrue(manager.bipartiteSources.size() == 2);
+    Assert.assertTrue(manager.bipartiteSources.containsKey(mockSrcVertexId1));
+    Assert.assertTrue(manager.bipartiteSources.containsKey(mockSrcVertexId2));
+    
+    HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
+    TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
+    managedTasks.put(mockTaskId1, null);
+    TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
+    managedTasks.put(mockTaskId2, null);
+    TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
+    managedTasks.put(mockTaskId3, null);
+    
+    when(mockManagedVertex.getTotalTasks()).thenReturn(3);
+    when(mockManagedVertex.getTasks()).thenReturn(managedTasks);
+    
+    final HashSet<TezTaskID> scheduledTasks = new HashSet<TezTaskID>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          scheduledTasks.clear();
+          scheduledTasks.addAll((Collection<TezTaskID>)args[0]); 
+          return null;
+      }}).when(mockManagedVertex).scheduleTasks(anyCollection());
+    
+    // source vertices have 0 tasks. immediate start of all managed tasks
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(0);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    
+    when(mockSrcVertex1.getTotalTasks()).thenReturn(2);
+    when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
+
+    try {
+      // source vertex have some tasks. min < 0.
+      manager = createManager(conf, mockManagedVertex, -0.1f, 0);
+      Assert.assertTrue(false); // should not come here
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Invalid values for slowStartMinSrcCompletionFraction"));
+    }
+    
+    try {
+      // source vertex have some tasks. min > max
+      manager = createManager(conf, mockManagedVertex, 0.5f, 0.3f);
+      Assert.assertTrue(false); // should not come here
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Invalid values for slowStartMinSrcCompletionFraction"));
+    }
+    
+    // source vertex have some tasks. min, max == 0
+    manager = createManager(conf, mockManagedVertex, 0, 0);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalTasksToSchedule == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 0);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+
+    TezTaskAttemptID mockSrcAttemptId11 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId12 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
+    TezTaskAttemptID mockSrcAttemptId21 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
+    TezTaskAttemptID mockSrcAttemptId22 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 1), 0);
+    TezTaskAttemptID mockSrcAttemptId31 = 
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
+    
+    // min, max > 0 and min == max
+    manager = createManager(conf, mockManagedVertex, 0.25f, 0.25f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    manager.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 0);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 1);
+    
+    // min, max > 0 and min == max == absolute max 1.0
+    manager = createManager(conf, mockManagedVertex, 1.0f, 1.0f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    manager.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 0);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 1);
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 3);
+    manager.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 4);
+    
+    // min, max > 0 and min == max
+    manager = createManager(conf, mockManagedVertex, 1.0f, 1.0f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    manager.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 0);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 1);
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
+    Assert.assertTrue(manager.numSourceTasksCompleted == 3);
+    manager.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(manager.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 4);
+    
+    // min, max > and min < max
+    manager = createManager(conf, mockManagedVertex, 0.25f, 0.75f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(manager.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 2);
+    // completion of same task again should not get counted
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(manager.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 3);
+    scheduledTasks.clear();
+    manager.onSourceTaskCompleted(mockSrcAttemptId22); // we are done. no action
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 4);
+
+    // min, max > and min < max
+    manager = createManager(conf, mockManagedVertex, 0.25f, 1.0f);
+    manager.onVertexStarted(null);
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.numSourceTasks == 4);
+    manager.onSourceTaskCompleted(mockSrcAttemptId11);
+    manager.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(manager.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(manager.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 3);
+    manager.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
+    Assert.assertTrue(manager.numSourceTasksCompleted == 4);
+
+  }
+  
+  private ShuffleVertexManager createManager(Configuration conf, 
+      Vertex vertex, float min, float max) {
+    ShuffleVertexManager manager = new ShuffleVertexManager(vertex);
+    conf.setFloat(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
+    conf.setFloat(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
+    manager.initialize(conf);
+    return manager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/25615ff3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
deleted file mode 100644
index 301a79b..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.dag.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.app.dag.EdgeManager;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.common.collect.Lists;
-
-import static org.mockito.Mockito.*;
-
-public class TestVertexScheduler {
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  @Test(timeout = 5000)
-  public void testShuffleVertexManagerAutoParallelism() throws IOException {
-    Configuration conf = new Configuration();
-    conf.setBoolean(
-        TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
-        true);
-    conf.setLong(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
-    ShuffleVertexManager scheduler = null;
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
-    HashMap<Vertex, Edge> mockInputVertices = 
-        new HashMap<Vertex, Edge>();
-    Vertex mockSrcVertex1 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
-    EdgeProperty eProp1 = new EdgeProperty(
-        EdgeProperty.DataMovementType.SCATTER_GATHER,
-        EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
-    Vertex mockSrcVertex2 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
-    EdgeProperty eProp2 = new EdgeProperty(
-        EdgeProperty.DataMovementType.SCATTER_GATHER,
-        EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
-    Vertex mockSrcVertex3 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
-    EdgeProperty eProp3 = new EdgeProperty(
-        EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
-    
-    Vertex mockManagedVertex = mock(Vertex.class);
-    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 4);
-    when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
-    when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
-    
-    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
-    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
-    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
-
-    // check initialization
-    scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
-    Assert.assertTrue(scheduler.bipartiteSources.size() == 2);
-    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId1));
-    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
-    
-    final HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
-    final TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
-    managedTasks.put(mockTaskId1, null);
-    final TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
-    managedTasks.put(mockTaskId2, null);
-    final TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
-    managedTasks.put(mockTaskId3, null);
-    final TezTaskID mockTaskId4 = TezTaskID.getInstance(mockManagedVertexId, 3);
-    managedTasks.put(mockTaskId4, null);
-    
-    when(mockManagedVertex.getTotalTasks()).thenReturn(managedTasks.size());
-    when(mockManagedVertex.getTasks()).thenReturn(managedTasks);
-    
-    final HashSet<TezTaskID> scheduledTasks = new HashSet<TezTaskID>();
-    doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
-          Object[] args = invocation.getArguments();
-          scheduledTasks.clear();
-          scheduledTasks.addAll((Collection<TezTaskID>)args[0]); 
-          return null;
-      }}).when(mockManagedVertex).scheduleTasks(anyCollection());
-    
-    final Map<Vertex, EdgeManager> newEdgeManagers = new HashMap<Vertex, EdgeManager>();
-    
-    doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
-          managedTasks.remove(mockTaskId3);
-          managedTasks.remove(mockTaskId4);
-          newEdgeManagers.clear();
-          newEdgeManagers.putAll((Map<Vertex, EdgeManager>)invocation.getArguments()[1]);
-          return null;
-      }}).when(mockManagedVertex).setParallelism(eq(2), anyMap());
-    
-    // source vertices have 0 tasks. immediate start of all managed tasks
-    when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
-    when(mockSrcVertex2.getTotalTasks()).thenReturn(0);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
-    scheduledTasks.clear();
-    
-    when(mockSrcVertex1.getTotalTasks()).thenReturn(2);
-    when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
-
-    TezTaskAttemptID mockSrcAttemptId11 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
-    TezTaskAttemptID mockSrcAttemptId12 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
-    TezTaskAttemptID mockSrcAttemptId21 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
-    TezTaskAttemptID mockSrcAttemptId31 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
-
-    byte[] payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
-    VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
-    // parallelism not change due to large data size
-    scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 4); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    scheduler.onVertexManagerEventReceived(vmEvent);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    // managedVertex tasks reduced
-    verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
-    Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
-    Assert.assertEquals(4, scheduledTasks.size());
-    Assert.assertEquals(1, scheduler.numSourceTasksCompleted); // TODO
-    Assert.assertEquals(5000L, scheduler.completedSourceTasksOutputSize);
-    
-    
-    // parallelism changed due to small data size
-    scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
-    vmEvent = new VertexManagerEvent("Vertex", payload);
-    
-    scheduler = createScheduler(conf, mockManagedVertex, 0.5f, 0.5f);
-    scheduler.onVertexStarted(null);
-    Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(4, scheduler.numSourceTasks);
-    // task completion from non-bipartite stage does nothing
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
-    Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(4, scheduler.numSourceTasks);
-    Assert.assertEquals(0, scheduler.numSourceTasksCompleted);
-    scheduler.onVertexManagerEventReceived(vmEvent);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    Assert.assertEquals(4, scheduler.pendingTasks.size());
-    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
-    Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
-    Assert.assertEquals(1, scheduler.numVertexManagerEventsReceived);
-    Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
-    // ignore duplicate completion
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    Assert.assertEquals(4, scheduler.pendingTasks.size());
-    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
-    Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
-    Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
-    
-    scheduler.onVertexManagerEventReceived(vmEvent);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    // managedVertex tasks reduced
-    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
-    Assert.assertEquals(2, newEdgeManagers.size());
-    // TODO improve tests for parallelism
-    Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
-    Assert.assertEquals(2, scheduledTasks.size());
-    Assert.assertTrue(scheduledTasks.contains(mockTaskId1));
-    Assert.assertTrue(scheduledTasks.contains(mockTaskId2));
-    Assert.assertEquals(2, scheduler.numSourceTasksCompleted);
-    Assert.assertEquals(2, scheduler.numVertexManagerEventsReceived);
-    Assert.assertEquals(1000L, scheduler.completedSourceTasksOutputSize);
-    
-    // more completions dont cause recalculation of parallelism
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
-    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
-    Assert.assertEquals(2, newEdgeManagers.size());
-    
-    EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
-    List<Integer> targets = Lists.newArrayList();
-    DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
-    edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
-    Assert.assertEquals(3, dmEvent.getTargetIndex());
-    Assert.assertEquals(0, targets.get(0).intValue());
-    targets.clear();
-    dmEvent = new DataMovementEvent(2, new byte[0]);
-    edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
-    Assert.assertEquals(0, dmEvent.getTargetIndex());
-    Assert.assertEquals(1, targets.get(0).intValue());    
-  }
-  
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  @Test(timeout = 5000)
-  public void testShuffleVertexManagerSlowStart() {
-    Configuration conf = new Configuration();
-    ShuffleVertexManager scheduler = null;
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    TezDAGID dagId = TezDAGID.getInstance("1", 1, 1);
-    HashMap<Vertex, Edge> mockInputVertices = 
-        new HashMap<Vertex, Edge>();
-    Vertex mockSrcVertex1 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId1 = TezVertexID.getInstance(dagId, 1);
-    EdgeProperty eProp1 = new EdgeProperty(
-        EdgeProperty.DataMovementType.SCATTER_GATHER,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1);
-    Vertex mockSrcVertex2 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId2 = TezVertexID.getInstance(dagId, 2);
-    EdgeProperty eProp2 = new EdgeProperty(
-        EdgeProperty.DataMovementType.SCATTER_GATHER,
-        EdgeProperty.DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2);
-    Vertex mockSrcVertex3 = mock(Vertex.class);
-    TezVertexID mockSrcVertexId3 = TezVertexID.getInstance(dagId, 3);
-    EdgeProperty eProp3 = new EdgeProperty(
-        EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, 
-        SchedulingType.SEQUENTIAL, 
-        new OutputDescriptor("out"),
-        new InputDescriptor("in"));
-    when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3);
-    
-    Vertex mockManagedVertex = mock(Vertex.class);
-    TezVertexID mockManagedVertexId = TezVertexID.getInstance(dagId, 3);
-    when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
-    when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
-    
-    // fail if there is no bipartite src vertex
-    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
-    try {
-      scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
-     Assert.assertFalse(true);
-    } catch (TezUncheckedException e) {
-      Assert.assertTrue(e.getMessage().contains(
-          "Atleast 1 bipartite source should exist"));
-    }
-    
-    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
-    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
-    
-    // check initialization
-    scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
-    Assert.assertTrue(scheduler.bipartiteSources.size() == 2);
-    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId1));
-    Assert.assertTrue(scheduler.bipartiteSources.containsKey(mockSrcVertexId2));
-    
-    HashMap<TezTaskID, Task> managedTasks = new HashMap<TezTaskID, Task>();
-    TezTaskID mockTaskId1 = TezTaskID.getInstance(mockManagedVertexId, 0);
-    managedTasks.put(mockTaskId1, null);
-    TezTaskID mockTaskId2 = TezTaskID.getInstance(mockManagedVertexId, 1);
-    managedTasks.put(mockTaskId2, null);
-    TezTaskID mockTaskId3 = TezTaskID.getInstance(mockManagedVertexId, 2);
-    managedTasks.put(mockTaskId3, null);
-    
-    when(mockManagedVertex.getTotalTasks()).thenReturn(3);
-    when(mockManagedVertex.getTasks()).thenReturn(managedTasks);
-    
-    final HashSet<TezTaskID> scheduledTasks = new HashSet<TezTaskID>();
-    doAnswer(new Answer() {
-      public Object answer(InvocationOnMock invocation) {
-          Object[] args = invocation.getArguments();
-          scheduledTasks.clear();
-          scheduledTasks.addAll((Collection<TezTaskID>)args[0]); 
-          return null;
-      }}).when(mockManagedVertex).scheduleTasks(anyCollection());
-    
-    // source vertices have 0 tasks. immediate start of all managed tasks
-    when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
-    when(mockSrcVertex2.getTotalTasks()).thenReturn(0);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    
-    when(mockSrcVertex1.getTotalTasks()).thenReturn(2);
-    when(mockSrcVertex2.getTotalTasks()).thenReturn(2);
-
-    try {
-      // source vertex have some tasks. min < 0.
-      scheduler = createScheduler(conf, mockManagedVertex, -0.1f, 0);
-      Assert.assertTrue(false); // should not come here
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().contains(
-          "Invalid values for slowStartMinSrcCompletionFraction"));
-    }
-    
-    try {
-      // source vertex have some tasks. min > max
-      scheduler = createScheduler(conf, mockManagedVertex, 0.5f, 0.3f);
-      Assert.assertTrue(false); // should not come here
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().contains(
-          "Invalid values for slowStartMinSrcCompletionFraction"));
-    }
-    
-    // source vertex have some tasks. min, max == 0
-    scheduler = createScheduler(conf, mockManagedVertex, 0, 0);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    Assert.assertTrue(scheduler.totalTasksToSchedule == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-
-    TezTaskAttemptID mockSrcAttemptId11 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 0), 0);
-    TezTaskAttemptID mockSrcAttemptId12 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId1, 1), 0);
-    TezTaskAttemptID mockSrcAttemptId21 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 0), 0);
-    TezTaskAttemptID mockSrcAttemptId22 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId2, 1), 0);
-    TezTaskAttemptID mockSrcAttemptId31 = 
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(mockSrcVertexId3, 0), 0);
-    
-    // min, max > 0 and min == max
-    scheduler = createScheduler(conf, mockManagedVertex, 0.25f, 0.25f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    // task completion from non-bipartite stage does nothing
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
-    
-    // min, max > 0 and min == max == absolute max 1.0
-    scheduler = createScheduler(conf, mockManagedVertex, 1.0f, 1.0f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    // task completion from non-bipartite stage does nothing
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
-    
-    // min, max > 0 and min == max
-    scheduler = createScheduler(conf, mockManagedVertex, 1.0f, 1.0f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    // task completion from non-bipartite stage does nothing
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
-    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
-    
-    // min, max > and min < max
-    scheduler = createScheduler(conf, mockManagedVertex, 0.25f, 0.75f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
-    // completion of same task again should not get counted
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
-    Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
-    scheduledTasks.clear();
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId22); // we are done. no action
-    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
-    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
-
-    // min, max > and min < max
-    scheduler = createScheduler(conf, mockManagedVertex, 0.25f, 1.0f);
-    scheduler.onVertexStarted(null);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(scheduler.numSourceTasks == 4);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 1);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
-    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
-    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
-    Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
-    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
-
-  }
-  
-  private ShuffleVertexManager createScheduler(Configuration conf, 
-      Vertex vertex, float min, float max) {
-    ShuffleVertexManager scheduler = new ShuffleVertexManager(vertex);
-    conf.setFloat(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min);
-    conf.setFloat(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max);
-    scheduler.initialize(conf);
-    return scheduler;
-  }
-}


Mime
View raw message