tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1261. ShuffleVertexManager auto parallelism is broken after TEZ-1131 (bikas)
Date Wed, 09 Jul 2014 00:05:13 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 3535b142f -> ca09b9626


TEZ-1261. ShuffleVertexManager auto parallelism is broken after TEZ-1131 (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ca09b962
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ca09b962
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ca09b962

Branch: refs/heads/master
Commit: ca09b96263d2fba6684700ba22ef3f237cc1c235
Parents: 3535b14
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Jul 8 17:04:14 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Jul 8 17:04:49 2014 -0700

----------------------------------------------------------------------
 .../vertexmanager/ShuffleVertexManager.java     | 18 ++++++-------
 .../vertexmanager/TestShuffleVertexManager.java | 28 +++++++++++---------
 2 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ca09b962/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c8b406d..c8fd1e6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -113,7 +113,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   boolean enableAutoParallelism = false;
   boolean parallelismDetermined = false;
   
-  int numSourceTasks = 0;
+  int totalNumSourceTasks = 0;
   int numSourceTasksCompleted = 0;
   int numVertexManagerEventsReceived = 0;
   List<Integer> pendingTasks;
@@ -304,7 +304,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     updateSourceTaskCount();
     
     LOG.info("OnVertexStarted vertex: " + context.getVertexName() + 
-             " with " + numSourceTasks + " source tasks and " + 
+             " with " + totalNumSourceTasks + " source tasks and " + 
              totalTasksToSchedule + " pending tasks");
     
     if (completions != null) {
@@ -369,7 +369,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     for(String vertex : bipartiteSources.keySet()) {
       numSrcTasks += context.getVertexNumTasks(vertex);
     }
-    numSourceTasks = numSrcTasks;
+    totalNumSourceTasks = numSrcTasks;
   }
 
   void determineParallelismAndApply() {
@@ -383,7 +383,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     
     int currentParallelism = pendingTasks.size();
     long expectedTotalSourceTasksOutputSize = 
-        (numSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
+        (totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
     int desiredTaskParallelism = 
         (int)(
             (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
@@ -427,7 +427,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         CustomShuffleEdgeManagerConfig edgeManagerConfig =
             new CustomShuffleEdgeManagerConfig(
                 currentParallelism, finalTaskParallelism, 
-                numSourceTasks, basePartitionRange,
+                context.getVertexNumTasks(vertex), basePartitionRange,
                 ((remainderRangeForLastShuffler > 0) ?
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerDescriptor edgeManagerDescriptor =
@@ -469,7 +469,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return;
     }
     
-    if (numSourceTasksCompleted == numSourceTasks && numPendingTasks > 0) {
+    if (numSourceTasksCompleted == totalNumSourceTasks && numPendingTasks > 0)
{
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
           " remaining tasks for vertex: " + context.getVertexName());
@@ -478,8 +478,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
 
     float completedSourceTaskFraction = 0f;
-    if (numSourceTasks != 0) { // support for 0 source tasks
-      completedSourceTaskFraction = (float)numSourceTasksCompleted/numSourceTasks;
+    if (totalNumSourceTasks != 0) { // support for 0 source tasks
+      completedSourceTaskFraction = (float)numSourceTasksCompleted/totalNumSourceTasks;
     } else {
       completedSourceTaskFraction = 1;
     }
@@ -517,7 +517,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       LOG.info("Scheduling " + numTasksToSchedule + " tasks for vertex: " + 
                context.getVertexName() + " with totalTasks: " + 
                totalTasksToSchedule + ". " + numSourceTasksCompleted + 
-               " source tasks completed out of " + numSourceTasks + 
+               " source tasks completed out of " + totalNumSourceTasks + 
                ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
                " min: " + slowStartMinSrcCompletionFraction + 
                " max: " + slowStartMaxSrcCompletionFraction);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ca09b962/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index fbeba3a..f45a7d4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -179,7 +179,7 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     // managedVertex tasks reduced
@@ -199,11 +199,11 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
     manager.onVertexStarted(null);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(4, manager.numSourceTasks);
+    Assert.assertEquals(4, manager.totalNumSourceTasks);
     // task completion from non-bipartite stage does nothing
     manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
-    Assert.assertEquals(4, manager.numSourceTasks);
+    Assert.assertEquals(4, manager.totalNumSourceTasks);
     Assert.assertEquals(0, manager.numSourceTasksCompleted);
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
@@ -241,6 +241,10 @@ public class TestShuffleVertexManager {
     EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
     Map<Integer, List<Integer>> targets = Maps.newHashMap();
     DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
+    // 4 source task outputs - same as original number of partitions
+    Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0));
+    // 4 destination task inputs - 2 source tasks + 2 merged partitions
+    Assert.assertEquals(4, edgeManager.getNumDestinationTaskPhysicalInputs(0));
     edgeManager.routeDataMovementEventToDestination(dmEvent, 1, dmEvent.getSourceIndex(),
targets);
     Assert.assertEquals(1, targets.size());
     Map.Entry<Integer, List<Integer>> e = targets.entrySet().iterator().next();
@@ -364,7 +368,7 @@ public class TestShuffleVertexManager {
     // source vertex have some tasks. min, max == 0
     manager = createManager(conf, mockContext, 0, 0);
     manager.onVertexStarted(null);
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numSourceTasksCompleted == 0);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -374,11 +378,11 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.25f, 0.25f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
     manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     Assert.assertTrue(manager.numSourceTasksCompleted == 0);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -389,11 +393,11 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
     manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     Assert.assertTrue(manager.numSourceTasksCompleted == 0);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
@@ -413,11 +417,11 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
     manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     Assert.assertTrue(manager.numSourceTasksCompleted == 0);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
@@ -437,7 +441,7 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.25f, 0.75f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
@@ -462,7 +466,7 @@ public class TestShuffleVertexManager {
     manager = createManager(conf, mockContext, 0.25f, 1.0f);
     manager.onVertexStarted(null);
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
-    Assert.assertTrue(manager.numSourceTasks == 4);
+    Assert.assertTrue(manager.totalNumSourceTasks == 4);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);


Mime
View raw message