hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject hadoop git commit: MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov (cherry picked from commit 7dc3c1203d1ab14c09d0aaf0869a5bcdfafb0a5a)
Date Wed, 02 Sep 2015 22:30:45 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 c90f025e9 -> 41601eae6


MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by
Gera Shegalov
(cherry picked from commit 7dc3c1203d1ab14c09d0aaf0869a5bcdfafb0a5a)

(cherry picked from commit 87c2d915f1cc799cb4020c945c04d3ecb82ee963)
(cherry picked from commit 1da62ba736f5f161a18a52b7ca0d212786f3848c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/41601eae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/41601eae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/41601eae

Branch: refs/heads/branch-2.6.1
Commit: 41601eae6b266ce88e47a06a1dc765cfc296b640
Parents: c90f025
Author: Jason Lowe <jlowe@apache.org>
Authored: Mon May 4 19:02:39 2015 +0000
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Wed Sep 2 15:27:47 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../mapreduce/task/reduce/MergeManagerImpl.java | 47 +++++++++++---------
 .../mapreduce/task/reduce/TestMergeManager.java | 29 ++++++++++++
 3 files changed, 57 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/41601eae/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index f4716d8..64b3b5c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -35,6 +35,9 @@ Release 2.6.1 - UNRELEASED
     MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM
     tokens when they roll-over. (Jason Lowe via vinodkv)
 
+    MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
+    (Gera Shegalov via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41601eae/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index a4b1aa8..3699ddd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -93,8 +93,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K,
V> {
   
   Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
   private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
+
+  @VisibleForTesting
+  final long memoryLimit;
+
   private long usedMemory;
   private long commitMemory;
   private final long maxSingleShuffleLimit;
@@ -167,11 +169,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K,
V> {
     }
 
     // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
+    this.memoryLimit = (long)(jobConf.getLong(
+        MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+        Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
+
     this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
 
     final float singleShuffleMemoryLimitPercent =
@@ -201,7 +202,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K,
V> {
 
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
       throw new RuntimeException("Invalid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
+          + "maxSingleShuffleLimit should be less than mergeThreshold "
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "mergeThreshold: " + this.mergeThreshold);
     }
@@ -667,24 +668,26 @@ public class MergeManagerImpl<K, V> implements MergeManager<K,
V> {
     }
   }
 
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
-                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
-                                       List<CompressAwarePath> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
+  @VisibleForTesting
+  final long getMaxInMemReduceLimit() {
     final float maxRedPer =
-      job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
+        jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
     if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
+      throw new RuntimeException(maxRedPer + ": "
+          + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT
+          + " must be a float between 0 and 1.0");
     }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    
+    return (long)(memoryLimit * maxRedPer);
+  }
 
+  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+                                       List<CompressAwarePath> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " +
+        inMemoryMapOutputs.size() + " in-memory map-outputs and " +
+        onDiskMapOutputs.size() + " on-disk map-outputs");
+    final long maxInMemReduce = getMaxInMemReduceLimit();
     // merge config params
     Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
     Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41601eae/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index 8d6bab9..ef860af 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -260,4 +260,33 @@ public class TestMergeManager {
     }
 
   }
+
+  @Test
+  public void testLargeMemoryLimits() throws Exception {
+    final JobConf conf = new JobConf();
+    // Xmx in production
+    conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+        8L * 1024 * 1024 * 1024);
+
+    // M1 = Xmx fraction for map outputs
+    conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
+
+    // M2 = max M1 fraction for a single maple output
+    conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
+
+    // M3 = M1 fraction at which in memory merge is triggered
+    conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
+
+    // M4 = M1 fraction of map outputs remaining in memory for a reduce
+    conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
+
+    final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
+        null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
+        null, null, null, null, null, new MROutputFiles());
+    assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
+        mgr.memoryLimit > Integer.MAX_VALUE);
+    final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
+    assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
+        maxInMemReduce > Integer.MAX_VALUE);
+  }
 }


Mime
View raw message