tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-1248. Reduce slow-start should special case 1 reducer runs. (Zhiyuan Yang via hitesh)
Date Mon, 11 Jul 2016 23:56:42 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 e25cb30ad -> 68e016f77


TEZ-1248. Reduce slow-start should special case 1 reducer runs. (Zhiyuan Yang via hitesh)

(cherry picked from commit 8131896b393995e4c9de955847a107c7f000c7fb)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/68e016f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/68e016f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/68e016f7

Branch: refs/heads/branch-0.8
Commit: 68e016f77a7e4537d4509dfbca1de16954abe998
Parents: e25cb30
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Jul 11 16:55:37 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Jul 11 16:56:33 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../vertexmanager/ShuffleVertexManager.java     |  6 ++-
 .../vertexmanager/TestShuffleVertexManager.java | 42 +++++++++++++++++---
 3 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ee8502c..5ddc8e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-1248. Reduce slow-start should special case 1 reducer runs.
 
 Release 0.8.4: 2016-07-08
 

http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index aee8b6f..6104a1d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -961,8 +961,10 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     
     tasksFractionToSchedule = Math.max(0, Math.min(1, tasksFractionToSchedule));
 
-    int numTasksToSchedule = 
-        ((int)(tasksFractionToSchedule * totalTasksToSchedule) - 
+    // round up to avoid the corner case that single task cannot be scheduled until src completed
+    // fraction reach max
+    int numTasksToSchedule =
+        ((int)(Math.ceil(tasksFractionToSchedule * totalTasksToSchedule)) -
          (totalTasksToSchedule - numPendingTasks));
     
     if (numTasksToSchedule > 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/68e016f7/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index df1f080..2566c94 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -408,8 +408,8 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
anyMap());
     verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class),
anyMap());
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    Assert.assertEquals(1, manager.pendingTasks.size());
-    Assert.assertEquals(1, scheduledTasks.size());
+    Assert.assertEquals(0, manager.pendingTasks.size());
+    Assert.assertEquals(2, scheduledTasks.size());
     Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
@@ -805,13 +805,13 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
-    Assert.assertTrue(manager.pendingTasks.size() == 2);
-    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 2); // 2 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
-    Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
     scheduledTasks.clear();
     manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we
are done. no action
@@ -845,6 +845,38 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
 
+    // if there is single task to schedule, it should be schedule when src completed
+    // fraction is more than min slow start fraction
+    scheduledTasks.clear();
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1);
+    manager = createManager(conf, mockContext, 0.25f, 0.75f);
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+    Assert.assertTrue(manager.pendingTasks.size() == 1); // no tasks scheduled
+    Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    Assert.assertTrue(manager.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+    scheduledTasks.clear();
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+    scheduledTasks.clear();
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we
are done. no action
+    Assert.assertTrue(manager.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
+    Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
   }
 
 


Mime
View raw message