hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1453456 - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/...
Date Wed, 06 Mar 2013 17:51:33 GMT
Author: suresh
Date: Wed Mar  6 17:51:32 2013
New Revision: 1453456

URL: http://svn.apache.org/r1453456
Log:
Merging trunk to branch-trunk-win

Modified:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt   (contents,
props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1453120-1453453

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1453456&r1=1453455&r2=1453456&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Wed Mar 
6 17:51:32 2013
@@ -201,6 +201,10 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does 
     not exist. (sandyr via tucu)
 
+    MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+    appropriately used and that on-disk segments are correctly sorted on
+    file-size. (Anty Rao and Ravi Prakash via acmurthy) 
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1453120-1453453

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1453120-1453453

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1453456&r1=1453455&r2=1453456&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
(original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
Wed Mar  6 17:51:32 2013
@@ -169,7 +169,7 @@ public class Merger {  
   }
 
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                           Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1453456&r1=1453455&r2=1453456&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
(original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
Wed Mar  6 17:51:32 2013
@@ -477,7 +477,7 @@ public class MergeManagerImpl<K, V> impl
         }
         writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
+            writer.getRawLength(), writer.getCompressedLength());
 
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
@@ -500,7 +500,7 @@ public class MergeManagerImpl<K, V> impl
   private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
     
     public OnDiskMerger(MergeManagerImpl<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      super(manager, ioSortFactor, exceptionReporter);
       setName("OnDiskMerger - Thread to merge on-disk map-outputs");
       setDaemon(true);
     }
@@ -554,7 +554,7 @@ public class MergeManagerImpl<K, V> impl
         Merger.writeFile(iter, writer, reporter, jobConf);
         writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
+            writer.getRawLength(), writer.getCompressedLength());
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -719,7 +719,7 @@ public class MergeManagerImpl<K, V> impl
           Merger.writeFile(rIter, writer, reporter, job);
           writer.close();
           onDiskMapOutputs.add(new CompressAwarePath(outputPath,
-              writer.getRawLength()));
+              writer.getRawLength(), writer.getCompressedLength()));
           writer = null;
           // add to list of final disk outputs.
         } catch (IOException e) {
@@ -791,7 +791,7 @@ public class MergeManagerImpl<K, V> impl
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
+          job, fs, keyClass, valueClass, codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();
@@ -810,24 +810,45 @@ public class MergeManagerImpl<K, V> impl
 
   static class CompressAwarePath extends Path {
     private long rawDataLength;
+    private long compressedSize;
 
-    public CompressAwarePath(Path path, long rawDataLength) {
+    public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
       super(path.toUri());
       this.rawDataLength = rawDataLength;
+      this.compressedSize = compressSize;
     }
 
     public long getRawDataLength() {
       return rawDataLength;
     }
-    
+
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+
     @Override
     public boolean equals(Object other) {
       return super.equals(other);
     }
-    
+
     @Override
     public int hashCode() {
       return super.hashCode();
     }
+
+    @Override
+    public int compareTo(Object obj) {
+      if(obj instanceof CompressAwarePath) {
+        CompressAwarePath compPath = (CompressAwarePath) obj;
+        if(this.compressedSize < compPath.getCompressedSize()) {
+          return -1;
+        } else if (this.getCompressedSize() > compPath.getCompressedSize()) {
+          return 1;
+        }
+        // Not returning 0 here so that objects with the same size (but
+        // different paths) are still added to the TreeSet.
+      }
+      return super.compareTo(obj);
+    }
   }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1453456&r1=1453455&r2=1453456&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
(original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
Wed Mar  6 17:51:32 2013
@@ -48,6 +48,7 @@ class OnDiskMapOutput<K, V> extends MapO
   private final Path outputPath;
   private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
+  private long compressedSize;
 
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K, V> merger, long size,
@@ -108,13 +109,14 @@ class OnDiskMapOutput<K, V> extends MapO
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")");
     }
+    this.compressedSize = compressedLength;
   }
 
   @Override
   public void commit() throws IOException {
     localFS.rename(tmpOutputPath, outputPath);
     CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
-        getSize());
+        getSize(), this.compressedSize);
     merger.closeOnDiskFile(compressAwarePath);
   }
   

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1453120-1453453

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1453456&r1=1453455&r2=1453456&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
(original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
Wed Mar  6 17:51:32 2013
@@ -17,28 +17,38 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestMergeManager {
 
   @Test(timeout=10000)
-  @SuppressWarnings("unchecked")
   public void testMemoryMerge() throws Exception {
     final int TOTAL_MEM_BYTES = 10000;
     final int OUTPUT_SIZE = 7950;
@@ -195,4 +205,59 @@ public class TestMergeManager {
       return exceptions.size();
     }
   }
+
+  @SuppressWarnings({ "unchecked", "deprecation" })
+  @Test(timeout=10000)
+  public void testOnDiskMerger() throws IOException, URISyntaxException,
+    InterruptedException {
+    JobConf jobConf = new JobConf();
+    final int SORT_FACTOR = 5;
+    jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+    MapOutputFile mapOutputFile = new MROutputFiles();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    MergeManagerImpl<IntWritable, IntWritable> manager =
+      new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
+        , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+    MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+      onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+        IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+          "onDiskMerger");
+    int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+      "mergeFactor");
+
+    // make sure the io.sort.factor is set properly
+    assertEquals(mergeFactor, SORT_FACTOR);
+
+    // Stop the onDiskMerger thread so that we can intercept the list of files
+    // waiting to be merged.
+    onDiskMerger.suspend();
+
+    //Send the list of fake files waiting to be merged
+    Random rand = new Random();
+    for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+      Path path = new Path("somePath");
+      CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
+      manager.closeOnDiskFile(cap);
+    }
+
+    //Check that the files pending to be merged are in sorted order.
+    LinkedList<List<CompressAwarePath>> pendingToBeMerged =
+      (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
+        onDiskMerger, "pendingToBeMerged");
+    assertTrue("No inputs were added to list pending to merge",
+      pendingToBeMerged.size() > 0);
+    for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+      List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
+      for(int j = 1; j < inputs.size(); ++j) {
+        assertTrue("Not enough / too many inputs were going to be merged",
+          inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+        assertTrue("Inputs to be merged were not sorted according to size: ",
+          inputs.get(j).getCompressedSize()
+          >= inputs.get(j-1).getCompressedSize());
+      }
+    }
+
+  }
 }



Mime
View raw message