tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-1963. Fix post memory merge to be > 2 GB (Rajesh Balamohan)
Date Mon, 19 Jan 2015 22:50:48 GMT
Repository: tez
Updated Branches:
  refs/heads/master 4bc64b5c3 -> b4885e470


TEZ-1963. Fix post memory merge to be > 2 GB (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b4885e47
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b4885e47
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b4885e47

Branch: refs/heads/master
Commit: b4885e4706af7f4decf756ee824c6cf23245ec3a
Parents: 4bc64b5
Author: Rajesh Balamohan <rbalamohan@hortonworks.com>
Authored: Tue Jan 20 04:20:28 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@hortonworks.com>
Committed: Tue Jan 20 04:20:28 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../shuffle/orderedgrouped/MergeManager.java    | 15 ++--
 .../orderedgrouped/TestMergeManager.java        | 92 +++++++++++++++++++-
 3 files changed, 99 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b4885e47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b7b2b0..8714f78 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1963. Fix post memory merge to be > 2 GB.
   TEZ-1901. Fix findbugs warnings in tez-examples.
   TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be setup explicitly.
   TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode.

http://git-wip-us.apache.org/repos/asf/tez/blob/b4885e47/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 1524418..54266b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -98,7 +98,8 @@ public class MergeManager {
   final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
-  private final int postMergeMemLimit;
+  @VisibleForTesting
+  final long postMergeMemLimit;
   private long usedMemory;
   private long commitMemory;
   private final int ioSortFactor;
@@ -201,9 +202,8 @@ public class MergeManager {
     if (maxRedPer > 1.0 || maxRedPer < 0.0) {
       throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT
+ maxRedPer);
     }
-    // TODO maxRedBuffer should be a long.
-    int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
-        Integer.MAX_VALUE);
+
+    long maxRedBuffer = (long) (inputContext.getTotalMemoryAvailableToTask() * maxRedPer);
     // Figure out initial memory req end
     
     if (this.initialMemoryAvailable < memLimit) {
@@ -213,7 +213,7 @@ public class MergeManager {
     }
     
     if (this.initialMemoryAvailable < maxRedBuffer) {
-      this.postMergeMemLimit = (int) this.initialMemoryAvailable;
+      this.postMergeMemLimit = this.initialMemoryAvailable;
     } else {
       this.postMergeMemLimit = maxRedBuffer;
     }
@@ -315,9 +315,8 @@ public class MergeManager {
       if (maxRedPer > 1.0 || maxRedPer < 0.0) {
         throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT
+ maxRedPer);
       }
-      // TODO maxRedBuffer should be a long.
-      int maxRedBuffer = (int) Math.min(maxAvailableTaskMemory * maxRedPer,
-          Integer.MAX_VALUE);
+      long maxRedBuffer = (long) (maxAvailableTaskMemory * maxRedPer);
+
       LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using
factor: " + maxRedPer);
 
       long reqMem = Math.max(maxRedBuffer, memLimit);

http://git-wip-us.apache.org/repos/asf/tez/blob/b4885e47/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 7615ba7..9be75f8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -42,6 +42,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -81,6 +82,91 @@ public class TestMergeManager {
   }
 
   @Test(timeout = 10000)
+  public void testConfigs() throws IOException {
+    long maxTaskMem = 8192 * 1024 * 1024l;
+
+    //Test Shuffle fetch buffer and post merge buffer percentage
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.8f);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.5f);
+    Assert.assertTrue(MergeManager.getInitialMemoryRequirement(conf, maxTaskMem) == 6871947776l);
+
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.5f);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.5f);
+    Assert.assertTrue(MergeManager.getInitialMemoryRequirement(conf, maxTaskMem) > Integer.MAX_VALUE);
+
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.4f);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.9f);
+    Assert.assertTrue(MergeManager.getInitialMemoryRequirement(conf, maxTaskMem) > Integer.MAX_VALUE);
+
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.1f);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.1f);
+    Assert.assertTrue(MergeManager.getInitialMemoryRequirement(conf, maxTaskMem) < Integer.MAX_VALUE);
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 2.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, -2.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
1.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong post merge buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
-1.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong post merge buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 1.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong shuffle fetch buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    try {
+      conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, -1.4f);
+      MergeManager.getInitialMemoryRequirement(conf, maxTaskMem);
+      Assert.fail("Should have thrown wrong shuffle fetch buffer percent configuration exception");
+    } catch(IllegalArgumentException ie) {
+    }
+
+    //test post merge mem limit
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.4f);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.8f);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext t0inputContext = createMockInputContext(UUID.randomUUID().toString(), maxTaskMem);
+    ExceptionReporter t0exceptionReporter = mock(ExceptionReporter.class);
+    long initialMemoryAvailable = (long) (maxTaskMem * 0.8);
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null,
null,
+            t0exceptionReporter, initialMemoryAvailable, null, false, -1);
+    Assert.assertTrue(mergeManager.postMergeMemLimit > Integer.MAX_VALUE);
+
+    initialMemoryAvailable = 200 * 1024 * 1024l; //initial mem < memlimit
+    mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null,
null,
+            t0exceptionReporter, initialMemoryAvailable, null, false, -1);
+    Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
+  }
+
+  @Test(timeout = 10000)
   public void testLocalDiskMergeMultipleTasks() throws IOException {
 
     Configuration conf = new TezConfiguration(defaultConf);
@@ -190,9 +276,13 @@ public class TestMergeManager {
   }
 
   private InputContext createMockInputContext(String uniqueId) {
+    return createMockInputContext(uniqueId, 200 * 1024 * 1024l);
+  }
+
+  private InputContext createMockInputContext(String uniqueId, long mem) {
     InputContext inputContext = mock(InputContext.class);
     doReturn(new TezCounters()).when(inputContext).getCounters();
-    doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
+    doReturn(mem).when(inputContext).getTotalMemoryAvailableToTask();
     doReturn("srcVertexName").when(inputContext).getSourceVertexName();
     doReturn(uniqueId).when(inputContext).getUniqueIdentifier();
     return inputContext;


Mime
View raw message