tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh Balamohan and Bikas Saha)
Date Wed, 25 Nov 2015 02:48:36 GMT
Repository: tez
Updated Branches:
  refs/heads/master d9560b904 -> c4487f966


TEZ-2956. Handle auto-reduce parallelism when the totalNumBipartiteSourceTasks is 0 (Rajesh
Balamohan and Bikas Saha)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c4487f96
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c4487f96
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c4487f96

Branch: refs/heads/master
Commit: c4487f966a81c01ed061d502a397e2cf3b4bce44
Parents: d9560b9
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Nov 24 18:48:23 2015 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Nov 24 18:48:23 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../vertexmanager/ShuffleVertexManager.java     | 21 ++++++++++--------
 .../vertexmanager/TestShuffleVertexManager.java | 23 ++++++++++++++------
 3 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 990c681..59847ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2956. Handle auto-reduce parallelism when the
+  totalNumBipartiteSourceTasks is 0
   TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in counters
page without any delay
   TEZ-2946. Tez UI: At times RM return a huge error message making the yellow error bar to
fill the whole screen
   TEZ-2949. Allow duplicate dag names within session for Tez.

http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 5fb4df9..f10c89a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -641,12 +641,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
    */
   @VisibleForTesting
   boolean determineParallelismAndApply() {
-    if(numBipartiteSourceTasksCompleted == 0) {
-      return true;
-    }
-    
     if(numVertexManagerEventsReceived == 0) {
-      return true;
+      if (totalNumBipartiteSourceTasks > 0) {
+        return true;
+      }
     }
     
     int currentParallelism = pendingTasks.size();
@@ -669,8 +667,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return false;
     }
 
-    long expectedTotalSourceTasksOutputSize =
-        (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    long expectedTotalSourceTasksOutputSize = 0;
+    if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks >
0 ) {
+      expectedTotalSourceTasksOutputSize =
+          (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+    }
 
     int desiredTaskParallelism = 
         (int)(
@@ -770,8 +771,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       }
       getContext().doneReconfiguringVertex();
     }
-    //Sort in case partition stats are available
-    sortPendingTasksBasedOnDataSize();
+    if (totalNumBipartiteSourceTasks > 0) {
+      //Sort in case partition stats are available
+      sortPendingTasksBasedOnDataSize();
+    }
     List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
 
     while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c4487f96/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 965e99c..862e4df 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -186,7 +186,8 @@ public class TestShuffleVertexManager {
 
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) throws Exception {
-          when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+          final int numTasks = ((Integer)invocation.getArguments()[0]).intValue();
+          when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(numTasks);
           newEdgeManagers.clear();
           for (Entry<String, EdgeProperty> entry :
               ((Map<String, EdgeProperty>)invocation.getArguments()[2]).entrySet())
{
@@ -216,7 +217,7 @@ public class TestShuffleVertexManager {
 
               @Override
               public int getDestinationVertexNumTasks() {
-                return 2;
+                return numTasks;
               }
             };
             EdgeManagerPlugin edgeManager = ReflectionUtils
@@ -226,7 +227,7 @@ public class TestShuffleVertexManager {
             newEdgeManagers.put(entry.getKey(), edgeManager);
           }
           return null;
-      }}).when(mockContext).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
+      }}).when(mockContext).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
@@ -244,12 +245,15 @@ public class TestShuffleVertexManager {
     // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
+    verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism
changed
     scheduledTasks.clear();
     // TODO TEZ-1714 locking verify(mockContext, times(1)).vertexManagerDone(); // notified
after scheduling all tasks
 
     // check scheduling only after onVertexStarted
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
@@ -260,13 +264,16 @@ public class TestShuffleVertexManager {
     // trigger start and processing of pending notification events
     manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
-    Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism
changed
 
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
     VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
     // parallelism not change due to large data size
@@ -280,11 +287,13 @@ public class TestShuffleVertexManager {
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
+    verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+        (VertexLocationHint.class), anyMap());
     verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());


Mime
View raw message