hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject svn commit: r1526677 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/...
Date Thu, 26 Sep 2013 20:58:00 GMT
Author: jeagles
Date: Thu Sep 26 20:58:00 2013
New Revision: 1526677

URL: http://svn.apache.org/r1526677
Log:
MAPREDUCE-5543. In-memory map outputs can be leaked after shuffle completes in 0.23 (Jason
Lowe via jeagles)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1526677&r1=1526676&r2=1526677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Sep 26 20:58:00
2013
@@ -43,6 +43,9 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
     via tgraves)
 
+    MAPREDUCE-5543. In-memory map outputs can be leaked after shuffle completes
+    in 0.23 (Jason Lowe via jeagles)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1526677&r1=1526676&r2=1526677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
Thu Sep 26 20:58:00 2013
@@ -356,8 +356,11 @@ public class MergeManager<K, V> {
     
     List<MapOutput<K, V>> memory = 
       new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
+    inMemoryMergedMapOutputs.clear();
     memory.addAll(inMemoryMapOutputs);
+    inMemoryMapOutputs.clear();
     List<Path> disk = getDiskMapOutputs();
+    onDiskMapOutputs.clear();
     return finalMerge(jobConf, rfs, memory, disk);
   }
 
@@ -671,7 +674,8 @@ public class MergeManager<K, V> {
     }
   }
 
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+  @VisibleForTesting
+  RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                        List<MapOutput<K,V>> inMemoryMapOutputs,
                                        List<Path> onDiskMapOutputs
                                        ) throws IOException {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1526677&r1=1526676&r2=1526677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
Thu Sep 26 20:58:00 2013
@@ -39,7 +39,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,6 +123,45 @@ public class TestMergeManager {
         0, reporter.getNumExceptions());
   }
 
+  @Test
+  public void testFinalMergeFreesMemory() throws Throwable {
+    JobConf conf = new JobConf();
+    TestExceptionReporter reporter = new TestExceptionReporter();
+    CyclicBarrier mergeStart = new CyclicBarrier(2);
+    CyclicBarrier mergeComplete = new CyclicBarrier(2);
+    StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter,
+        mergeStart, mergeComplete) {
+
+          @Override
+          RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+              List<MapOutput<Text, Text>> inMemoryMapOutputs,
+              List<Path> onDiskMapOutputs) throws IOException {
+            return null;
+          }
+    };
+
+    // reserve enough map output to cause a merge when it is committed
+    TaskAttemptID mapId1 = new TaskAttemptID(
+        new TaskID("job_1234", 0, TaskType.MAP, 0), 0);
+    MapOutput<Text, Text> out1 = mgr.reserve(mapId1, 1000, 0);
+    Assert.assertEquals("Should be a memory merge",
+        Type.MEMORY, out1.getType());
+    fillOutput(out1);
+    out1.commit();
+    TaskAttemptID mapId2 = new TaskAttemptID(
+        new TaskID("job_1234", 0, TaskType.MAP, 1), 0);
+    MapOutput<Text, Text> out2 = mgr.reserve(mapId2, 1000, 0);
+    Assert.assertEquals("Should be a memory merge",
+        Type.MEMORY, out2.getType());
+    fillOutput(out2);
+    out2.commit();
+
+    mgr.close();
+    Assert.assertEquals(0, mgr.inMemoryMapOutputs.size());
+    Assert.assertEquals(0, mgr.inMemoryMergedMapOutputs.size());
+    Assert.assertEquals(0, mgr.onDiskMapOutputs.size());
+  }
+
   private void fillOutput(MapOutput<Text, Text> output) throws IOException {
     BoundedByteArrayOutputStream stream = output.getArrayStream();
     int count = stream.getLimit();



Mime
View raw message