Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B80B17D79 for ; Mon, 29 Sep 2014 00:35:09 +0000 (UTC) Received: (qmail 60105 invoked by uid 500); 29 Sep 2014 00:35:09 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 60050 invoked by uid 500); 29 Sep 2014 00:35:09 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 60040 invoked by uid 99); 29 Sep 2014 00:35:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 00:35:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 055E19B9CA1; Mon, 29 Sep 2014 00:35:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Mon, 29 Sep 2014 00:35:42 -0000 Message-Id: <25e61b4542cc48c88879110f6ee79bfe@git.apache.org> In-Reply-To: <82bc55d5adc141e4922f88c793b00754@git.apache.org> References: <82bc55d5adc141e4922f88c793b00754@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] git commit: TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness (Rajesh Balamohan) TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness (Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9159e117 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9159e117 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9159e117 Branch: refs/heads/branch-0.5 Commit: 9159e11700d3eff722fdda615565528182a619d2 Parents: 4023898 Author: Rajesh Balamohan Authored: Tue Sep 23 05:59:50 2014 +0530 Committer: Rajesh Balamohan Committed: Tue Sep 23 05:59:50 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../vertexmanager/ShuffleVertexManager.java | 60 ++++++++--- .../vertexmanager/TestShuffleVertexManager.java | 105 ++++++++++++++++++- 3 files changed, 146 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 407acc9..f3b2ed0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,7 @@ ALL CHANGES TEZ-1550. TestEnvironmentUpdateUtils.testMultipleUpdateEnvironment fails on Windows TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows + TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness Release 0.5.0: 2014-09-03 http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 85d46d5..2aaae16 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.library.vertexmanager; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; @@ -120,7 +121,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int minTaskParallelism = 1; boolean enableAutoParallelism = false; boolean parallelismDetermined = false; - + int totalNumSourceTasks = 0; int numSourceTasksCompleted = 0; int numVertexManagerEventsReceived = 0; @@ -382,18 +383,43 @@ public class ShuffleVertexManager extends VertexManagerPlugin { totalNumSourceTasks = numSrcTasks; } - void determineParallelismAndApply() { + /** + * Compute optimal parallelism needed for the job + * @return true (if parallelism is determined), false otherwise + */ + @VisibleForTesting + boolean determineParallelismAndApply() { if(numSourceTasksCompleted == 0) { - return; + return true; } if(numVertexManagerEventsReceived == 0) { - return; + return true; } int currentParallelism = pendingTasks.size(); - long expectedTotalSourceTasksOutputSize = - (totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived; + /** + * When overall completed output size is not even equal to + * desiredTaskInputSize, we can wait for some more data to be available to determine + * better parallelism until max.fraction is reached. min.fraction is just a hint to the + * framework and need not be honored strictly in this case. + */ + boolean canDetermineParallelismLater = (completedSourceTasksOutputSize < + desiredTaskInputDataSize) + && (numSourceTasksCompleted < (totalNumSourceTasks * slowStartMaxSrcCompletionFraction)); + if (canDetermineParallelismLater) { + LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName() + + ", totalNumSourceTasks=" + totalNumSourceTasks + + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize + + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived + + ", numSourceTasksCompleted=" + numSourceTasksCompleted + ", maxThreshold=" + + (totalNumSourceTasks * slowStartMaxSrcCompletionFraction)); + return false; + } + + long expectedTotalSourceTasksOutputSize = + (totalNumSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived; + int desiredTaskParallelism = (int)( (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/ @@ -403,7 +429,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } if(desiredTaskParallelism >= currentParallelism) { - return; + return true; } // most shufflers will be assigned this range @@ -411,7 +437,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { if (basePartitionRange <= 1) { // nothing to do if range is equal 1 partition. shuffler does it by default - return; + return true; } int numShufflersWithBaseRange = currentParallelism / basePartitionRange; @@ -425,7 +451,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + " . Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: " + completedSourceTasksOutputSize + " from " + numVertexManagerEventsReceived + " vertex manager events. " - + " desiredTaskInputSize: " + desiredTaskInputDataSize); + + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" + + (totalNumSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" + + numSourceTasksCompleted); if(finalTaskParallelism < currentParallelism) { // final parallelism is less than actual parallelism @@ -447,8 +475,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null); - updatePendingTasks(); + updatePendingTasks(); } + return true; } void schedulePendingTasks(int numTasksToSchedule) { @@ -460,9 +489,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // calculating parallelism or change parallelism while tasks are already // running then we can create other parameters to trigger this calculation. if(enableAutoParallelism && !parallelismDetermined) { - // do this once - parallelismDetermined = true; - determineParallelismAndApply(); + parallelismDetermined = determineParallelismAndApply(); + if (!parallelismDetermined) { + //try to determine parallelism later when more info is available. + return; + } } List scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule); while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) { @@ -498,8 +529,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // linearly increase the number of scheduled tasks such that all tasks are // scheduled when source tasks completed fraction reaches max float tasksFractionToSchedule = 1; - float percentRange = slowStartMaxSrcCompletionFraction - - slowStartMinSrcCompletionFraction; + float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction; if (percentRange > 0) { tasksFractionToSchedule = (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/ http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 4768c6c..9ac8210 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -232,14 +232,105 @@ public class TestShuffleVertexManager { Assert.assertEquals(4, scheduledTasks.size()); Assert.assertEquals(1, manager.numSourceTasksCompleted); Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize); - - + + /** + * Test for TEZ-978 + * Delay determining parallelism until enough data has been received. + */ + scheduledTasks.clear(); + payload = + VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer(); + vmEvent = VertexManagerEvent.create("Vertex", payload); + + //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. + manager = createManager(conf, mockContext, 0.01f, 0.75f); + manager.onVertexStarted(null); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(4, manager.totalNumSourceTasks); + Assert.assertEquals(0, manager.numSourceTasksCompleted); + + //First task in src1 completed with small payload + manager.onVertexManagerEventReceived(vmEvent); //small payload + manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + Assert.assertTrue(manager.determineParallelismAndApply() == false); + Assert.assertEquals(4, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + Assert.assertEquals(1, manager.numSourceTasksCompleted); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + Assert.assertEquals(1L, manager.completedSourceTasksOutputSize); + + //Second task in src1 completed with small payload + manager.onVertexManagerEventReceived(vmEvent); //small payload + manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later + Assert.assertTrue(manager.determineParallelismAndApply() == false); + Assert.assertEquals(4, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled + Assert.assertEquals(1, manager.numSourceTasksCompleted); + Assert.assertEquals(2, manager.numVertexManagerEventsReceived); + Assert.assertEquals(2L, manager.completedSourceTasksOutputSize); + + //First task in src2 completed (with larger payload) to trigger determining parallelism + payload = + VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString() + .asReadOnlyByteBuffer(); + vmEvent = VertexManagerEvent.create("Vertex", payload); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined + verify(mockContext, times(1)).setVertexParallelism(eq(2), any(VertexLocationHint.class), + anyMap(), + anyMap()); + manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + Assert.assertEquals(1, manager.pendingTasks.size()); + Assert.assertEquals(1, scheduledTasks.size()); + Assert.assertEquals(2, manager.numSourceTasksCompleted); + Assert.assertEquals(3, manager.numVertexManagerEventsReceived); + Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize); + + //Test for max fraction. Min fraction is just instruction to framework, but honor max fraction + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40); + scheduledTasks.clear(); + payload = + VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString() + .asReadOnlyByteBuffer(); + vmEvent = VertexManagerEvent.create("Vertex", payload); + + //min/max fraction of 0.0/0.2 + manager = createManager(conf, mockContext, 0.0f, 0.2f); + manager.onVertexStarted(null); + Assert.assertEquals(40, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(40, manager.totalNumSourceTasks); + Assert.assertEquals(0, manager.numSourceTasksCompleted); + //send 7 events with payload size as 100 + for(int i=0;i<7;i++) { + manager.onVertexManagerEventReceived(vmEvent); //small payload + manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i)); + //should not change parallelism + verify(mockContext, times(0)).setVertexParallelism(eq(4), any(VertexLocationHint.class), + anyMap(), + anyMap()); + } + //send 8th event with payload size as 100 + manager.onVertexManagerEventReceived(vmEvent); + manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(8)); + //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism + verify(mockContext, times(1)).setVertexParallelism(eq(4), any(VertexLocationHint.class), + anyMap(), + anyMap()); + + //reset context for next test + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); + // parallelism changed due to small data size scheduledTasks.clear(); payload = VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer(); vmEvent = VertexManagerEvent.create("Vertex", payload); - + manager = createManager(conf, mockContext, 0.5f, 0.5f); manager.onVertexStarted(null); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled @@ -266,7 +357,9 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // managedVertex tasks reduced - verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap()); + verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class), + anyMap(), + anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); // TODO improve tests for parallelism Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled @@ -279,7 +372,9 @@ public class TestShuffleVertexManager { // more completions dont cause recalculation of parallelism manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); - verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap()); + verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class), + anyMap(), + anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();