hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r474922 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/mapred/ReduceTask.java
Date Tue, 14 Nov 2006 19:09:17 GMT
Author: cutting
Date: Tue Nov 14 11:09:16 2006
New Revision: 474922

URL: http://svn.apache.org/viewvc?view=rev&rev=474922
Log:
HADOOP-613.  Perform final merge while reducing.  Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 14 11:09:16 2006
@@ -56,6 +56,10 @@
 17. HADOOP-705.  Fix a bug in the JobTracker when failed jobs were
     not completely cleaned up.  (Mahadev Konar via cutting)
 
+18. HADOOP-613.  Perform final merge while reducing.  This removes one
+    sort pass over the data and should consequently significantly
+    decrease overall processing time.  (Devaraj Das via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Nov 14 11:09:16
2006
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Support for flat files of binary key/value pairs. */
@@ -1538,6 +1539,34 @@
       }
     }
 
+    /** 
+     * Perform a file sort from a set of input files and return an iterator.
+     * @param inFiles the files to be sorted
+     * @param tempDir the directory where temp files are created during sort
+     * @param deleteInput should the input files be deleted as they are read?
+     * @return iterator the RawKeyValueIterator
+     */
+    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
+                                    boolean deleteInput) throws IOException {
+      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
+      if (fs.exists(outFile)) {
+        throw new IOException("already exists: " + outFile);
+      }
+      this.inFiles = inFiles;
+      //outFile will basically be used as prefix for temp files in the cases
+      //where sort outputs multiple sorted segments. For the single segment
+      //case, the outputFile itself will contain the sorted data for that
+      //segment
+      this.outFile = outFile;
+
+      int segments = sortPass(deleteInput);
+      if (segments > 1)
+        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"));
+      else if (segments == 1)
+        return merge(new Path[]{outFile}, true);
+      else return null;
+    }
+
     /**
      * The backwards compatible interface to sort.
      * @param inFile the input file to sort
@@ -1799,6 +1828,10 @@
        * @throws IOException
        */
       void close() throws IOException;
+      /** Gets the Progress object; this has a float (0.0 - 1.0) 
+       * indicating the bytes processed by the iterator so far
+       */
+      Progress getProgress();
     }    
     
   /**
@@ -1941,6 +1974,9 @@
       private boolean blockCompress;
       private DataOutputBuffer rawKey = new DataOutputBuffer();
       private ValueBytes rawValue;
+      private long totalBytesProcessed;
+      private float progPerByte;
+      private Progress mergeProgress = new Progress();
       
       //a TreeMap used to store the segments sorted by size (segment offset and
       //segment path name is used to break ties between segments of same sizes)
@@ -1992,7 +2028,7 @@
         //load the raw value. Re-use the existing rawValue buffer
         if(rawValue == null)
           rawValue = ms.in.createValueBytes();
-        ms.nextRawValue(rawValue);
+        int valLength = ms.nextRawValue(rawValue);
 
         if (ms.nextRawKey()) {
           adjustTop();
@@ -2000,9 +2036,17 @@
           pop();
           ms.cleanup();
         }
+        if (progPerByte > 0) {
+          totalBytesProcessed += rawKey.getLength() + valLength;
+          mergeProgress.set(totalBytesProcessed * progPerByte);
+        }
         return true;
       }
       
+      public Progress getProgress() {
+        return mergeProgress; 
+      }
+
       /** This is the single level merge that is called multiple times 
        * depending on the factor size and the number of segments
        * @return RawKeyValueIterator
@@ -2029,6 +2073,13 @@
           //if we have lesser number of segments remaining, then just return the
           //iterator, else do another single level merge
           if (numSegments <= factor) {
+            //calculate the length of the remaining segments. Required for 
+            //calculating the merge progress
+            long totalBytes = 0;
+            for (int i = 0; i < numSegments; i++)
+              totalBytes += mStream[i].segmentLength;
+            if (totalBytes != 0) //being paranoid
+              progPerByte = 1.0f / (float)totalBytes;
             return this;
           } else {
             //we want to spread the creation of temp files on multiple disks if 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=474922&r1=474921&r2=474922
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 14 11:09:16
2006
@@ -122,23 +122,28 @@
 
   /** Iterates values while keys match in sorted input. */
   private class ValuesIterator implements Iterator {
-    private SequenceFile.Reader in;               // input file
+    private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
     private WritableComparable key;               // current key
     private Writable value;                       // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private float progPerByte;
     private TaskUmbilicalProtocol umbilical;
     private WritableComparator comparator;
-
-    public ValuesIterator (SequenceFile.Reader in, long length,
-                           WritableComparator comparator,
-                           TaskUmbilicalProtocol umbilical)
+    private Class keyClass;
+    private Class valClass;
+    private DataOutputBuffer valOut = new DataOutputBuffer();
+    private DataInputBuffer valIn = new DataInputBuffer();
+    private DataInputBuffer keyIn = new DataInputBuffer();
+
+    public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
+                           WritableComparator comparator, Class keyClass,
+                           Class valClass, TaskUmbilicalProtocol umbilical)
       throws IOException {
       this.in = in;
-      this.progPerByte = 1.0f / (float)length;
       this.umbilical = umbilical;
       this.comparator = comparator;
+      this.keyClass = keyClass;
+      this.valClass = valClass;
       getNext();
     }
 
@@ -173,18 +178,26 @@
     public WritableComparable getKey() { return key; }
 
     private void getNext() throws IOException {
-      reducePhase.set(in.getPosition()*progPerByte); // update progress
+      reducePhase.set(in.getProgress().get()); // update progress
       reportProgress(umbilical);
 
       Writable lastKey = key;                     // save previous key
       try {
-        key = (WritableComparable)in.getKeyClass().newInstance();
-        value = (Writable)in.getValueClass().newInstance();
+        key = (WritableComparable)keyClass.newInstance();
+        value = (Writable)valClass.newInstance();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      more = in.next(key, value);
+      more = in.next();
       if (more) {
+        //de-serialize the raw key/value
+        keyIn.reset(in.getKey().getData(), in.getKey().getLength());
+        key.readFields(keyIn);
+        valOut.reset();
+        (in.getValue()).writeUncompressedBytes(valOut);
+        valIn.reset(valOut.getData(), valOut.getLength());
+        value.readFields(valIn);
+
         if (lastKey == null) {
           hasNext = true;
         } else {
@@ -233,10 +246,12 @@
       };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
 
-    Path sortedFile = job.getLocalPath(getTaskId()+Path.SEPARATOR+"all.2");
+    Path tempDir = job.getLocalPath(getTaskId()); 
 
     WritableComparator comparator = job.getOutputKeyComparator();
-    
+   
+    SequenceFile.Sorter.RawKeyValueIterator rIter;
+ 
     try {
       setPhase(TaskStatus.Phase.SORT) ; 
       sortProgress.start();
@@ -244,7 +259,8 @@
       // sort the input file
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort
+      rIter = sorter.sortAndIterate(mapFiles, tempDir, 
+                                    !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
       sortComplete = true;
@@ -269,11 +285,11 @@
       };
     
     // apply reduce function
-    SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
-    long length = lfs.getLength(sortedFile);
     try {
-      ValuesIterator values = new ValuesIterator(in, length, comparator,
-                                                 umbilical);
+      Class keyClass = job.getMapOutputKeyClass();
+      Class valClass = job.getMapOutputValueClass();
+      ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass, 
+                                                 valClass, umbilical);
       while (values.more()) {
         myMetrics.reduceInput();
         reducer.reduce(values.getKey(), values, collector, reporter);
@@ -282,8 +298,6 @@
 
     } finally {
       reducer.close();
-      in.close();
-      lfs.delete(sortedFile);                     // remove sorted
       out.close(reporter);
     }
     done(umbilical);



Mime
View raw message