Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A52D811D66 for ; Sat, 23 Aug 2014 02:18:25 +0000 (UTC) Received: (qmail 43285 invoked by uid 500); 23 Aug 2014 02:18:25 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 43250 invoked by uid 500); 23 Aug 2014 02:18:25 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 43241 invoked by uid 99); 23 Aug 2014 02:18:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Aug 2014 02:18:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 579B49C8FE0; Sat, 23 Aug 2014 02:18:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rbalamohan@apache.org To: commits@tez.apache.org Message-Id: <78c9a73e9b0044c2a49a51ec723643dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1448. Make WeightedScalingMemoryDistributor as the default memory distributor (Rajesh Balamohan) Date: Sat, 23 Aug 2014 02:18:25 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 81a02833c -> b00bde76c TEZ-1448. Make WeightedScalingMemoryDistributor as the default memory distributor (Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b00bde76 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b00bde76 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b00bde76 Branch: refs/heads/master Commit: b00bde76cb269327f62232f2cd5ba618ad39fb12 Parents: 81a0283 Author: Rajesh Balamohan Authored: Sat Aug 23 07:48:03 2014 +0530 Committer: Rajesh Balamohan Committed: Sat Aug 23 07:48:03 2014 +0530 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 2 +- .../TestLogicalIOProcessorRuntimeTask.java | 3 ++ .../WeightedScalingMemoryDistributor.java | 29 ++++++++++++++++---- .../TestWeightedScalingMemoryDistributor.java | 17 ++---------- 4 files changed, 29 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index bad33cb..b740638 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -397,7 +397,7 @@ public class TezConfiguration extends Configuration { + "scale.task.memory.allocator.class"; @Private public static final String TEZ_TASK_SCALE_TASK_MEMORY_ALLOCATOR_CLASS_DEFAULT = - "org.apache.tez.runtime.common.resources.ScalingAllocator"; + "org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor"; /** * The fraction of the JVM memory which will not be considered for allocation. http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index d4374f9..62a5c8c 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.common.resources.ScalingAllocator; import org.junit.Test; import com.google.common.collect.HashMultimap; @@ -65,6 +66,8 @@ public class TestLogicalIOProcessorRuntimeTask { Multimap startedInputsMap = HashMultimap.create(); TezUmbilical umbilical = mock(TezUmbilical.class); TezConfiguration tezConf = new TezConfiguration(); + tezConf.set(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ALLOCATOR_CLASS, + ScalingAllocator.class.getName()); TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1); TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1"); http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java index c6eeced..7ca6e19 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java @@ -47,13 +47,13 @@ import com.google.common.collect.Maps; /** * Distributes memory between various requesting components by applying a * weighted scaling function. Overall, ensures that all requestors stay within the JVM limits. - * + * * Configuration involves specifying weights for the different Inputs available * in the tez-runtime-library. As an example, SortedShuffle : SortedOutput : * UnsortedShuffle could be configured to be 20:10:1. In this case, if both * SortedShuffle and UnsortedShuffle ask for the same amount of initial memory, * SortedShuffle will be given 20 times more; both may be scaled down to fit within the JVM though. - * + * */ @Public @Unstable @@ -61,8 +61,10 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator private static final Log LOG = LogFactory.getLog(WeightedScalingMemoryDistributor.class); - static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.3d; - static final double RESERVATION_FRACTION_PER_IO = 0.025d; + static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.1d; + static final double RESERVATION_FRACTION_PER_IO = 0.015d; + static final String[] DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS = + generateWeightStrings(1, 1, 12, 12, 1, 1); private Configuration conf; @@ -72,7 +74,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator @Private @VisibleForTesting public enum RequestType { - PARTITIONED_UNSORTED_OUTPUT, UNSORTED_INPUT, SORTED_OUTPUT, SORTED_MERGED_INPUT, PROCESSOR, OTHER + PARTITIONED_UNSORTED_OUTPUT, UNSORTED_INPUT, UNSORTED_OUTPUT, SORTED_OUTPUT, + SORTED_MERGED_INPUT, PROCESSOR, OTHER }; private EnumMap typeScaleMap = Maps.newEnumMap(RequestType.class); @@ -197,7 +200,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator } private void populateTypeScaleMap() { - String[] ratios = conf.getStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS); + String[] ratios = conf.getStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS, + DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS); int numExpectedValues = RequestType.values().length; if (ratios == null) { LOG.info("No ratio specified. Falling back to Linear scaling"); @@ -261,6 +265,19 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator return reserveFraction; } + public static String[] generateWeightStrings(int unsortedPartitioned, int broadcastIn, + int sortedOut, int scatterGatherShuffleIn, int proc, int other) { + String[] weights = new String[RequestType.values().length]; + weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT.name() + ":" + unsortedPartitioned; + weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + 0; + weights[2] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn; + weights[3] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut; + weights[4] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn; + weights[5] = RequestType.PROCESSOR.name() + ":" + proc; + weights[6] = RequestType.OTHER.name() + ":" + other; + return weights; + } + @Override public void setConf(Configuration conf) { this.conf = conf; http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java index e6888fd..881fa4c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java @@ -35,7 +35,6 @@ import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; import org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor; -import org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor.RequestType; import org.junit.Test; import com.google.common.base.Joiner; @@ -55,7 +54,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor public void testSimpleWeightedScaling() { Configuration conf = new Configuration(this.conf); conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS, - generateWeightStrings(1, 2, 3, 1, 1)); + WeightedScalingMemoryDistributor.generateWeightStrings(0, 1, 2, 3, 1, 1)); System.err.println(Joiner.on(",").join(conf.getStringCollection( TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS))); @@ -102,7 +101,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor public void testAdditionalReserveFractionWeightedScaling() { Configuration conf = new Configuration(this.conf); conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS, - generateWeightStrings(2, 3, 6, 1, 1)); + WeightedScalingMemoryDistributor.generateWeightStrings(0, 2, 3, 6, 1, 1)); conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO, 0.025d); conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX, 0.2d); @@ -167,16 +166,4 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor return desc; } - private String[] generateWeightStrings(int broadcastIn, int sortedOut, - int scatterGatherShuffleIn, int proc, int other) { - String[] weights = new String[RequestType.values().length]; - weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT + ":" + 0; - weights[1] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn; - weights[2] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut; - weights[3] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn; - weights[4] = RequestType.PROCESSOR.name() + ":" + proc; - weights[5] = RequestType.OTHER.name() + ":" + other; - return weights; - } - }