tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas)
Date Thu, 10 Dec 2015 01:00:43 GMT
Repository: tez
Updated Branches:
  refs/heads/master 5af06047f -> 0dd68ad1c


TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0dd68ad1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0dd68ad1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0dd68ad1

Branch: refs/heads/master
Commit: 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d
Parents: 5af0604
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Dec 9 17:00:17 2015 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Dec 9 17:00:17 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../vertexmanager/ShuffleVertexManager.java     | 142 +++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 199 +++++++++++--------
 3 files changed, 209 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9031566..cfa93bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
+  reduce and slow start
   TEZ-2346. TEZ-UI: Lazy load other info / counter data
   TEZ-2975. Bump up apache commons dependency.
   TEZ-2970. Re-localization in TezChild does not use correct UGI.

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f10c89a..c88c7a2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -168,12 +168,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     EdgeProperty edgeProperty;
     boolean vertexIsConfigured;
     BitSet finishedTaskSet;
+    int numTasks;
+    int numVMEventsReceived;
+    long outputSize;
 
     SourceVertexInfo(EdgeProperty edgeProperty) {
       this.edgeProperty = edgeProperty;
-      if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-        finishedTaskSet = new BitSet();
-      }
+      finishedTaskSet = new BitSet();
+    }
+    
+    int getNumTasks() {
+      return numTasks;
+    }
+    
+    int getNumCompletedTasks() {
+      return finishedTaskSet.cardinality();
     }
   }
 
@@ -482,6 +491,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
       srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
       // TODO what if derived class has already called this
+      // register for status update from all source vertices
       getContext().registerForVertexStateUpdates(entry.getKey(),
           EnumSet.of(VertexState.CONFIGURED));
       if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
@@ -498,7 +508,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     
     // track the tasks in this vertex
     updatePendingTasks();
-    updateSourceTaskCount();
     
     LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
              " with " + totalNumBipartiteSourceTasks + " source tasks and " +
@@ -519,19 +528,20 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
     String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
     int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
-    updateSourceTaskCount();
     SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
-
-    if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-      //handle duplicate events for bipartite sources
-      BitSet completedSourceTasks = srcInfo.finishedTaskSet;
-      if (completedSourceTasks != null) {
-        // duplicate notifications tracking
-        if (!completedSourceTasks.get(srcTaskId)) {
-          completedSourceTasks.set(srcTaskId);
-          // source task has completed
-          ++numBipartiteSourceTasksCompleted;
-        }
+    if (srcInfo.vertexIsConfigured) {
+      Preconditions.checkState(srcTaskId < srcInfo.numTasks,  
+          "Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName +
+          " has only " + srcInfo.numTasks + " tasks");
+    }
+    //handle duplicate events and count task completions from all source vertices
+    BitSet completedSourceTasks = srcInfo.finishedTaskSet;
+    // duplicate notifications tracking
+    if (!completedSourceTasks.get(srcTaskId)) {
+      completedSourceTasks.set(srcTaskId);
+      // source task has completed
+      if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+        numBipartiteSourceTasksCompleted++;
       }
     }
     schedulePendingTasks();
@@ -564,7 +574,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       LOG.info("Ignoring vertex manager event from: " + producerTask);
       return;
     }
-    
+
+    String vName = producerTask.getVertexIdentifier().getName();
+    SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
+    Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " + producerTask);
+
     numVertexManagerEventsReceived++;
 
     long sourceTaskOutputSize = 0;
@@ -591,12 +605,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
           throw new TezUncheckedException(e);
         }
       }
+      srcInfo.numVMEventsReceived++;
+      srcInfo.outputSize += sourceTaskOutputSize;
       completedSourceTasksOutputSize += sourceTaskOutputSize;
     }
-
+    
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Received info of output size: " + sourceTaskOutputSize
-          + " numInfoReceived: " + numVertexManagerEventsReceived
+      LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier()
+          + " received info of output size: " + sourceTaskOutputSize
+          + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived
+          + " vertex output size: " + srcInfo.outputSize
+          + " total numEventsReceived: " + numVertexManagerEventsReceived
           + " total output size: " + completedSourceTasksOutputSize);
     }
   }
@@ -613,7 +632,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     totalTasksToSchedule = pendingTasks.size();
     if (stats == null) {
-      stats = new long[totalTasksToSchedule];
+      stats = new long[totalTasksToSchedule]; // TODO lost previous data
     }
   }
 
@@ -625,22 +644,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     });
   }
 
-  void updateSourceTaskCount() {
-    // track source vertices
-    int numSrcTasks = 0;
-    Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
-    for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
-      numSrcTasks += getContext().getVertexNumTasks(entry.getKey());
-    }
-    totalNumBipartiteSourceTasks = numSrcTasks;
-  }
-
   /**
    * Compute optimal parallelism needed for the job
    * @return true (if parallelism is determined), false otherwise
    */
   @VisibleForTesting
-  boolean determineParallelismAndApply() {
+  boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) {
     if(numVertexManagerEventsReceived == 0) {
       if (totalNumBipartiteSourceTasks > 0) {
         return true;
@@ -656,21 +665,28 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
      */
     boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
         desiredTaskInputDataSize)
-        && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+        && (minSourceVertexCompletedTaskFraction < slowStartMaxSrcCompletionFraction);
     if (canDetermineParallelismLater) {
       LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
           + ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks
           + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
           + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
-          + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + ", maxThreshold="
-          + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction));
+          + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted
+          + ", minSourceVertexCompletedTaskFraction=" + minSourceVertexCompletedTaskFraction);
       return false;
     }
 
+    // Change this to use per partition stats for more accuracy TEZ-2962.
+    // Instead of aggregating overall size and then dividing equally - coalesce partitions until 
+    // desired per partition size is achieved.
     long expectedTotalSourceTasksOutputSize = 0;
-    if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) {
-      expectedTotalSourceTasksOutputSize =
-          (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+      SourceVertexInfo srcInfo = vInfo.getValue();
+      if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
+        // this assumes that 1 vmEvent is received per completed task - TEZ-2961
+        expectedTotalSourceTasksOutputSize += 
+            (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived;
+      }
     }
 
     int desiredTaskParallelism = 
@@ -755,7 +771,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
   }
 
-  void schedulePendingTasks(int numTasksToSchedule) {
+  void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) {
     // determine parallelism before scheduling the first time
     // this is the latest we can wait before determining parallelism.
     // currently this depends on task completion and so this is the best time
@@ -764,7 +780,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // calculating parallelism or change parallelism while tasks are already
     // running then we can create other parameters to trigger this calculation.
     if(enableAutoParallelism && !parallelismDetermined) {
-      parallelismDetermined = determineParallelismAndApply();
+      parallelismDetermined = determineParallelismAndApply(minSourceVertexCompletedTaskFraction);
       if (!parallelismDetermined) {
         //try to determine parallelism later when more info is available.
         return;
@@ -851,10 +867,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
    */
   boolean canScheduleTasks() {
     for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
-      String sourceVertex = entry.getKey();
-      int numSourceTasks = getContext().getVertexNumTasks(sourceVertex);
-      if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) {
-        // vertex not configured
+      // need to check for vertex configured because until that we dont know if numTasks==0 is valid
+      if (!entry.getValue().vertexIsConfigured) { // isConfigured
+        // vertex not scheduled tasks
         if (LOG.isDebugEnabled()) {
           LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: "
               + getContext().getVertexName());
@@ -888,29 +903,38 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
           " remaining tasks for vertex: " + getContext().getVertexName());
-      schedulePendingTasks(numPendingTasks);
+      schedulePendingTasks(numPendingTasks, 1);
       return;
     }
 
-    float completedSourceTaskFraction = 0f;
-    if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks
-      completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted / totalNumBipartiteSourceTasks;
-    } else {
-      completedSourceTaskFraction = 1;
+    float minSourceVertexCompletedTaskFraction = 1f;
+    String minCompletedVertexName = "";
+    for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+      SourceVertexInfo srcInfo = vInfo.getValue();
+      // canScheduleTasks check has already verified all sources are configured
+      Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " + vInfo.getKey());
+      if (srcInfo.numTasks > 0) {
+        int numCompletedTasks = srcInfo.getNumCompletedTasks();
+        float completedFraction = (float) numCompletedTasks / srcInfo.numTasks;
+        if (minSourceVertexCompletedTaskFraction > completedFraction) {
+          minSourceVertexCompletedTaskFraction = completedFraction;
+          minCompletedVertexName = vInfo.getKey();
+        }
+      }
     }
 
     // start scheduling when source tasks completed fraction is more than min.
     // linearly increase the number of scheduled tasks such that all tasks are 
     // scheduled when source tasks completed fraction reaches max
-    float tasksFractionToSchedule = 1; 
+    float tasksFractionToSchedule = 1;
     float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
     if (percentRange > 0) {
       tasksFractionToSchedule = 
-            (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+            (minSourceVertexCompletedTaskFraction - slowStartMinSrcCompletionFraction)/
             percentRange;
     } else {
       // min and max are equal. schedule 100% on reaching min
-      if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+      if(minSourceVertexCompletedTaskFraction < slowStartMinSrcCompletionFraction) {
         tasksFractionToSchedule = 0;
       }
     }
@@ -928,10 +952,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
                getContext().getVertexName() + " with totalTasks: " +
                totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted +
                " source tasks completed out of " + totalNumBipartiteSourceTasks +
-               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
+               ". MinSourceTaskCompletedFraction: " + minSourceVertexCompletedTaskFraction +
+               " in Vertex: " + minCompletedVertexName +
                " min: " + slowStartMinSrcCompletionFraction + 
                " max: " + slowStartMaxSrcCompletionFraction);
-      schedulePendingTasks(numTasksToSchedule);
+      schedulePendingTasks(numTasksToSchedule, minSourceVertexCompletedTaskFraction);
     }
   }
 
@@ -1002,8 +1027,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName()); 
     Preconditions.checkState(vInfo.vertexIsConfigured == false);
     vInfo.vertexIsConfigured = true;
+    vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName());
+    if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+      totalNumBipartiteSourceTasks += vInfo.numTasks;
+    }
     LOG.info("Received configured notification : " + stateUpdate.getVertexState() + " for vertex: "
-      + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
+      + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName() + 
+      " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks);
     schedulePendingTasks();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 862e4df..9d53ebc 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -242,10 +242,12 @@ public class TestShuffleVertexManager {
     
     // check waiting for notification before scheduling
     Assert.assertFalse(manager.pendingTasks.isEmpty());
-    // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
+    // source vertices have 0 tasks. triggers scheduling
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
-    verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+    verify(mockContext, times(1)).reconfigureVertex(eq(1), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed
@@ -258,13 +260,15 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
     // normally this event will not come before onVertexStarted() is called
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
     // trigger start and processing of pending notification events
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
-    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+    verify(mockContext, times(2)).reconfigureVertex(eq(1), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -275,30 +279,31 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
-    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1);
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     manager.onVertexManagerEventReceived(vmEvent);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
         (VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
     // TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks
-    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
 
     /**
@@ -313,11 +318,10 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
-    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId1));
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
 
@@ -329,7 +333,7 @@ public class TestShuffleVertexManager {
 
     // sending again from a different version of the same task has not impact
     TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
-    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId2));
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2));
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
 
@@ -348,41 +352,42 @@ public class TestShuffleVertexManager {
     //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //First task in src1 completed with small payload
-    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
     manager.onVertexManagerEventReceived(vmEvent); //small payload
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
 
-    //Second task in src1 completed with small payload
-    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+    //First task in src2 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
-    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
-    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
 
     //First task in src2 completed (with larger payload) to trigger determining parallelism
-    vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent);
-    Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+    Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined
+    verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(1, manager.pendingTasks.size());
     Assert.assertEquals(1, scheduledTasks.size());
@@ -395,10 +400,11 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
     scheduledTasks.clear();
-    vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
 
     //min/max fraction of 0.0/0.2
     manager = createManager(conf, mockContext, 0.0f, 0.2f);
+    // initial invocation count == 3
+    verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
@@ -407,22 +413,25 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
     //send 7 events with payload size as 100
-    for(int i=0;i<7;i++) {
-      manager.onVertexManagerEventReceived(vmEvent); //small payload
+    for(int i=0;i<8;i++) {
+      //small payload - create new event each time or it will be ignored (from same task)
+      manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1));
       manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
       //should not change parallelism
-      verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
+      verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    }
+    for(int i=0;i<3;i++) {
+      manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
+      verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     }
-    //send 8th event with payload size as 100
-    manager.onVertexManagerEventReceived(vmEvent);
-
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
     //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
-    verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class),
-        anyMap());
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
+    // parallelism updated
+    verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task)
+    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
 
     //reset context for next test
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -444,7 +453,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
@@ -458,14 +467,15 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
-    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+    vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
     manager.onVertexManagerEventReceived(vmEvent);
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
 
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     // managedVertex tasks reduced
-    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -478,7 +488,7 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+    verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -548,7 +558,8 @@ public class TestShuffleVertexManager {
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-    
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
+
     // fail if there is no bipartite src vertex
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
@@ -583,6 +594,9 @@ public class TestShuffleVertexManager {
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -618,8 +632,9 @@ public class TestShuffleVertexManager {
     }
 
     // source vertex have some tasks. min > default and max undefined
-    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
-    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+    int numTasks = 20;
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
     scheduledTasks.clear();
 
     manager = createManager(conf, mockContext, 0.8f, null);
@@ -627,18 +642,17 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+
     Assert.assertEquals(3, manager.pendingTasks.size());
-    Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
-    float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks;
-    int completedTasks = 0;
+    float completedTasksThreshold = 0.8f * numTasks;
     // Finish all tasks before exceeding the threshold
     for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
       for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
-        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
-        ++completedTasks;
-        if ((completedTasks + 1) >= completedTasksThreshold) {
+        // complete 0th tasks outside the loop
+        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
+        if ((i + 2) >= completedTasksThreshold) {
           // stop before completing more than min/max source tasks
           break;
         }
@@ -649,7 +663,10 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
 
     // Cross the threshold min/max threshold to schedule all tasks
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    Assert.assertEquals(3, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size());
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(0, manager.pendingTasks.size());
     Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
 
@@ -660,13 +677,13 @@ public class TestShuffleVertexManager {
     // source vertex have some tasks. min, max == 0
     manager = createManager(conf, mockContext, 0.0f, 0.0f);
     manager.onVertexStarted(emptyCompletions);
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
     // all source vertices need to be configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     
@@ -683,10 +700,15 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
+    // task completion on only 1 SG edge does nothing
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     
     // min, max > 0 and min == max == absolute max 1.0
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
@@ -742,6 +764,10 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
+    // reset vertices for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
+
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
@@ -749,26 +775,30 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     // completion of same task again should not get counted
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
     scheduledTasks.clear();
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
 
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 1.0f);
@@ -777,20 +807,24 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 1);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
-    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
 
   }
 
@@ -844,7 +878,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
-    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
 
@@ -867,11 +901,11 @@ public class TestShuffleVertexManager {
 
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
     //Send events for all tasks of m3.
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
@@ -879,22 +913,29 @@ public class TestShuffleVertexManager {
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
-    //Send an event for m2. But still we need to wait for at least 1 event from r1.
+    //Send events for m2. But still we need to wait for at least 1 event from r1.
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
+
+    // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
 
     //Ensure that setVertexParallelism is not called for R2.
     verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
         anyMap());
 
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
-    when(mockContext_R2.getVertexNumTasks("R2")).thenReturn(1);
+    when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
 
     // complete configuration of r1 triggers the scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
     verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class),
         anyMap());
   
@@ -913,14 +954,13 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
     manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
-    Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
     // Only need completed configuration notification from m3
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1032,12 +1072,12 @@ public class TestShuffleVertexManager {
 
     //Tasks should be scheduled in task 2, 0, 1 order
     long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)};
-    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2");
+    VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
     manager.onVertexManagerEventReceived(vmEvent); //send VM event
 
     //stats from another vertex (more of empty stats)
     sizes = new long[]{(0l), (0l), (0l)};
-    vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2");
+    vmEvent = getVertexManagerEvent(sizes, 1060000000, r1);
     manager.onVertexManagerEventReceived(vmEvent); //send VM event
 
     //Send an event for m2.
@@ -1139,7 +1179,7 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
-    //Send an event for m2.
+    //Send an event for m3.
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1197,8 +1237,9 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 0);
 
-    // event from m3 triggers scheduling. no need for m2 since it has 0 tasks
+    // event from m3 triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
 
@@ -1215,6 +1256,8 @@ public class TestShuffleVertexManager {
 
     //Send 1 events for tasks of r1.
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);


Mime
View raw message