hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r779572 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Thu, 28 May 2009 11:37:05 GMT
Author: ddas
Date: Thu May 28 11:37:05 2009
New Revision: 779572

URL: http://svn.apache.org/viewvc?rev=779572&view=rev
Log:
HADOOP-5539. Fixes a problem to do with not preserving intermediate output compression for
merged data. Contributed by Jothi Padmanabhan and Billy Pearson.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu May 28 11:37:05 2009
@@ -93,6 +93,10 @@
     momentary spurts in memory usage due to java's fork() model.
     (yhemanth)
 
+    HADOOP-5539. Fixes a problem to do with not preserving intermediate
+    output compression for merged data.
+    (Jothi Padmanabhan and Billy Pearson via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu
May 28 11:37:05 2009
@@ -1415,7 +1415,7 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
+                         keyClass, valClass, codec,
                          segmentList, job.getInt("io.sort.factor", 100),
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter,

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java Thu May
28 11:37:05 2009
@@ -65,6 +65,23 @@
   }
   
   public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                                   Class<K> keyClass, Class<V> valueClass,
+                                   CompressionCodec codec,
+                                   List<Segment<K, V>> segments, 
+                                   int mergeFactor, Path tmpDir,
+                                   RawComparator<K> comparator, Progressable reporter,
+                                   Counters.Counter readsCounter,
+                                   Counters.Counter writesCounter)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+        false, codec).merge(keyClass, valueClass,
+            mergeFactor, tmpDir,
+            readsCounter, writesCounter);
+
+  }
+
+  public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
                             Class<K> keyClass, Class<V> valueClass, 
                             List<Segment<K, V>> segments, 
@@ -110,6 +127,25 @@
                                                readsCounter, writesCounter);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -267,6 +303,13 @@
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu
May 28 11:37:05 2009
@@ -2234,7 +2234,7 @@
         diskSegments.addAll(0, memDiskSegments);
         memDiskSegments.clear();
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, numInMemSegments, tmpDir, comparator,
             reporter, false, spilledRecordsCounter, null);
         diskSegments.clear();



Mime
View raw message