hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r779571 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Thu, 28 May 2009 11:33:56 GMT
Author: ddas
Date: Thu May 28 11:33:56 2009
New Revision: 779571

URL: http://svn.apache.org/viewvc?rev=779571&view=rev
Log:
HADOOP-5539. Fixes a problem to do with not preserving intermediate output compression for
merged data. Contributed by Jothi Padmanabhan and Billy Pearson.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 28 11:33:56 2009
@@ -796,6 +796,10 @@
     momentary spurts in memory usage due to java's fork() model.
     (yhemanth)
 
+    HADOOP-5539. Fixes a problem to do with not preserving intermediate
+    output compression for merged data.
+    (Jothi Padmanabhan and Billy Pearson via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu May 28 11:33:56
2009
@@ -1434,7 +1434,7 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
+                         keyClass, valClass, codec,
                          segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Thu May 28 11:33:56
2009
@@ -97,6 +97,25 @@
                                                mergePhase);
   }
 
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            CompressionCodec codec,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
   static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class<K> keyClass, Class<V> valueClass,
@@ -116,6 +135,27 @@
                                                mergePhase);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -326,6 +366,13 @@
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu May 28 11:33:56
2009
@@ -2323,7 +2323,7 @@
         memDiskSegments.clear();
         Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
             tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
             mergePhase);



Mime
View raw message