tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1448. Make WeightedScalingMemoryDistributor as the default memory distributor (Rajesh Balamohan)
Date Sat, 23 Aug 2014 02:18:25 GMT
Repository: tez
Updated Branches:
  refs/heads/master 81a02833c -> b00bde76c


TEZ-1448. Make WeightedScalingMemoryDistributor as the default memory distributor (Rajesh
Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b00bde76
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b00bde76
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b00bde76

Branch: refs/heads/master
Commit: b00bde76cb269327f62232f2cd5ba618ad39fb12
Parents: 81a0283
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Sat Aug 23 07:48:03 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Sat Aug 23 07:48:03 2014 +0530

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  2 +-
 .../TestLogicalIOProcessorRuntimeTask.java      |  3 ++
 .../WeightedScalingMemoryDistributor.java       | 29 ++++++++++++++++----
 .../TestWeightedScalingMemoryDistributor.java   | 17 ++----------
 4 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index bad33cb..b740638 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -397,7 +397,7 @@ public class TezConfiguration extends Configuration {
       + "scale.task.memory.allocator.class";
   @Private
   public static final String TEZ_TASK_SCALE_TASK_MEMORY_ALLOCATOR_CLASS_DEFAULT =
-      "org.apache.tez.runtime.common.resources.ScalingAllocator";
+      "org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor";
 
   /**
    * The fraction of the JVM memory which will not be considered for allocation.

http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index d4374f9..62a5c8c 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.common.resources.ScalingAllocator;
 import org.junit.Test;
 
 import com.google.common.collect.HashMultimap;
@@ -65,6 +66,8 @@ public class TestLogicalIOProcessorRuntimeTask {
     Multimap<String, String> startedInputsMap = HashMultimap.create();
     TezUmbilical umbilical = mock(TezUmbilical.class);
     TezConfiguration tezConf = new TezConfiguration();
+    tezConf.set(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ALLOCATOR_CLASS,
+        ScalingAllocator.class.getName());
 
     TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
     TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1");

http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index c6eeced..7ca6e19 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -47,13 +47,13 @@ import com.google.common.collect.Maps;
 /**
  * Distributes memory between various requesting components by applying a
  * weighted scaling function. Overall, ensures that all requestors stay within the JVM limits.
- * 
+ *
  * Configuration involves specifying weights for the different Inputs available
  * in the tez-runtime-library. As an example, SortedShuffle : SortedOutput :
  * UnsortedShuffle could be configured to be 20:10:1. In this case, if both
  * SortedShuffle and UnsortedShuffle ask for the same amount of initial memory,
  * SortedShuffle will be given 20 times more; both may be scaled down to fit within the JVM
though.
- * 
+ *
  */
 @Public
 @Unstable
@@ -61,8 +61,10 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
 
   private static final Log LOG = LogFactory.getLog(WeightedScalingMemoryDistributor.class);
 
-  static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.3d;
-  static final double RESERVATION_FRACTION_PER_IO = 0.025d;
+  static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.1d;
+  static final double RESERVATION_FRACTION_PER_IO = 0.015d;
+  static final String[] DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS =
+      generateWeightStrings(1, 1, 12, 12, 1, 1);
 
   private Configuration conf;
 
@@ -72,7 +74,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
   @Private
   @VisibleForTesting
   public enum RequestType {
-    PARTITIONED_UNSORTED_OUTPUT, UNSORTED_INPUT, SORTED_OUTPUT, SORTED_MERGED_INPUT, PROCESSOR,
OTHER
+    PARTITIONED_UNSORTED_OUTPUT, UNSORTED_INPUT, UNSORTED_OUTPUT, SORTED_OUTPUT,
+    SORTED_MERGED_INPUT, PROCESSOR, OTHER
   };
 
   private EnumMap<RequestType, Integer> typeScaleMap = Maps.newEnumMap(RequestType.class);
@@ -197,7 +200,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
   }
 
   private void populateTypeScaleMap() {
-    String[] ratios = conf.getStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS);
+    String[] ratios = conf.getStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS,
+        DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS);
     int numExpectedValues = RequestType.values().length;
     if (ratios == null) {
       LOG.info("No ratio specified. Falling back to Linear scaling");
@@ -261,6 +265,19 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
     return reserveFraction;
   }
 
+  public static String[] generateWeightStrings(int unsortedPartitioned, int broadcastIn,
+      int sortedOut, int scatterGatherShuffleIn, int proc, int other) {
+    String[] weights = new String[RequestType.values().length];
+    weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT.name() + ":" + unsortedPartitioned;
+    weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + 0;
+    weights[2] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn;
+    weights[3] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut;
+    weights[4] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn;
+    weights[5] = RequestType.PROCESSOR.name() + ":" + proc;
+    weights[6] = RequestType.OTHER.name() + ":" + other;
+    return weights;
+  }
+
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/b00bde76/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index e6888fd..881fa4c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -35,7 +35,6 @@ import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 import org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor;
-import org.apache.tez.runtime.library.resources.WeightedScalingMemoryDistributor.RequestType;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -55,7 +54,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   public void testSimpleWeightedScaling() {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS,
-        generateWeightStrings(1, 2, 3, 1, 1));
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 1, 2, 3, 1, 1));
     System.err.println(Joiner.on(",").join(conf.getStringCollection(
         TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS)));
 
@@ -102,7 +101,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   public void testAdditionalReserveFractionWeightedScaling() {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_WEIGHTED_RATIOS,
-        generateWeightStrings(2, 3, 6, 1, 1));
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 2, 3, 6, 1, 1));
     conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO,
0.025d);
     conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_TASK_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX,
0.2d);
 
@@ -167,16 +166,4 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
     return desc;
   }
 
-  private String[] generateWeightStrings(int broadcastIn, int sortedOut,
-      int scatterGatherShuffleIn, int proc, int other) {
-    String[] weights = new String[RequestType.values().length];
-    weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT + ":" + 0;
-    weights[1] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn;
-    weights[2] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut;
-    weights[3] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn;
-    weights[4] = RequestType.PROCESSOR.name() + ":" + proc;
-    weights[5] = RequestType.OTHER.name() + ":" + other;
-    return weights;
-  }
-
 }


Mime
View raw message